mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-31 01:00:36 +00:00
Compare commits
19 Commits
bayandin/d
...
skyzh/immu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1863ae799d | ||
|
|
20fe57d93b | ||
|
|
0fad5e21ce | ||
|
|
a2056666ae | ||
|
|
a3909e03f8 | ||
|
|
fc190a2a19 | ||
|
|
faee3152f3 | ||
|
|
3693d1f431 | ||
|
|
fdf7a67ed2 | ||
|
|
1299df87d2 | ||
|
|
754ceaefac | ||
|
|
143fa0da42 | ||
|
|
4936ab6842 | ||
|
|
939593d0d3 | ||
|
|
2011cc05cd | ||
|
|
b0286e3c46 | ||
|
|
e4f05ce0a2 | ||
|
|
8d106708d7 | ||
|
|
f450369b20 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -110,6 +110,12 @@ dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||
|
||||
[[package]]
|
||||
name = "archery"
|
||||
version = "0.5.0"
|
||||
@@ -2542,6 +2548,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -32,6 +32,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
atty = "0.2.14"
|
||||
|
||||
@@ -370,11 +370,6 @@ impl ComputeNode {
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -427,22 +422,22 @@ impl ComputeNode {
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
spec.tenant_id,
|
||||
spec.timeline_id,
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
|
||||
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
|
||||
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
|
||||
if spec.spec.mode == ComputeMode::Primary {
|
||||
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
|
||||
self.apply_config(&compute_state)?;
|
||||
}
|
||||
|
||||
@@ -462,6 +457,11 @@ impl ComputeNode {
|
||||
}
|
||||
self.set_status(ComputeStatus::Running);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
|
||||
@@ -450,6 +450,7 @@ impl Endpoint {
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: false,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
cluster: Cluster {
|
||||
|
||||
@@ -27,6 +27,12 @@ pub struct ComputeSpec {
|
||||
pub cluster: Cluster,
|
||||
pub delta_operations: Option<Vec<DeltaOp>>,
|
||||
|
||||
/// An optinal hint that can be passed to speed up startup time if we know
|
||||
/// that no pg catalog mutations (like role creation, database creation,
|
||||
/// extension creation) need to be done on the actual database to start.
|
||||
#[serde(default)] // Default false
|
||||
pub skip_pg_catalog_updates: bool,
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
use pageserver::keyspace::{KeyPartitioning, KeySpace};
|
||||
use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
|
||||
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
|
||||
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
let mut layer_map = LayerMap::<LayerDescriptor>::default();
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
|
||||
let mut min_lsn = Lsn(u64::MAX);
|
||||
let mut max_lsn = Lsn(0);
|
||||
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
min_lsn = min(min_lsn, lsn_range.start);
|
||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
}
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
}
|
||||
|
||||
/// Construct a layer map query pattern for benchmarks
|
||||
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
|
||||
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
|
||||
// For each image layer we query one of the pages contained, at LSN right
|
||||
// before the image layer was created. This gives us a somewhat uniform
|
||||
// coverage of both the lsn and key space because image layers have
|
||||
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
|
||||
|
||||
// Construct a partitioning for testing get_difficulty map when we
|
||||
// don't have an exact result of `collect_keyspace` to work with.
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
// We add a partition boundary at the start of each image layer,
|
||||
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
for i in 0..100_000 {
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = LayerDescriptor {
|
||||
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
lsn: Lsn(i)..Lsn(i + 1),
|
||||
is_incremental: false,
|
||||
short_id: format!("Layer {}", i),
|
||||
};
|
||||
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
|
||||
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
Lsn(i),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
}
|
||||
updates.flush();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
@@ -516,7 +516,7 @@ async fn collect_eviction_candidates(
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -215,7 +215,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -233,7 +233,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -264,7 +264,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -330,6 +330,7 @@ async fn timeline_create_handler(
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -591,7 +592,7 @@ async fn tenant_status(
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -701,7 +702,7 @@ async fn layer_map_info_handler(
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
|
||||
@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
tokio_tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn import_file(
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.writer();
|
||||
let writer = modification.tline.writer().await;
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
|
||||
@@ -489,7 +489,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
|
||||
@@ -1122,7 +1122,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1130,19 +1130,20 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer();
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
false
|
||||
let mut retained_pending_updates = HashMap::new();
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
if is_rel_block_key(key) || is_slru_block_key(key) {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, self.lsn, &value).await?;
|
||||
} else {
|
||||
true
|
||||
retained_pending_updates.insert(key, value);
|
||||
}
|
||||
});
|
||||
result?;
|
||||
}
|
||||
self.pending_updates.extend(retained_pending_updates);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
@@ -1157,17 +1158,17 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
pub async fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
@@ -85,7 +85,9 @@ pub mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_cache;
|
||||
pub mod layer_map;
|
||||
pub mod layer_map_mgr;
|
||||
pub mod manifest;
|
||||
|
||||
pub mod metadata;
|
||||
@@ -186,18 +188,13 @@ struct TimelineUninitMark {
|
||||
}
|
||||
|
||||
impl UninitializedTimeline<'_> {
|
||||
/// Ensures timeline data is valid, loads it into pageserver's memory and removes
|
||||
/// uninit mark file on success.
|
||||
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
|
||||
/// uninit mark file.
|
||||
///
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
fn initialize_with_lock(
|
||||
mut self,
|
||||
_ctx: &RequestContext,
|
||||
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
|
||||
load_layer_map: bool,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
|
||||
@@ -205,25 +202,19 @@ impl UninitializedTimeline<'_> {
|
||||
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// Check that the caller initialized disk_consistent_lsn
|
||||
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
|
||||
// TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least
|
||||
// ensure!(new_disk_consistent_lsn.is_valid(),
|
||||
// "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized");
|
||||
ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => anyhow::bail!(
|
||||
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
if load_layer_map {
|
||||
new_timeline
|
||||
.load_layer_map(new_disk_consistent_lsn)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to load layermap for timeline {tenant_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
}
|
||||
uninit_mark.remove_uninit_mark().with_context(|| {
|
||||
format!(
|
||||
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
|
||||
@@ -252,9 +243,10 @@ impl UninitializedTimeline<'_> {
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
// We want to run proper checkpoint before we mark timeline as available to outside world
|
||||
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
|
||||
raw_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
@@ -266,10 +258,9 @@ impl UninitializedTimeline<'_> {
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
// All the data has been imported. Insert the Timeline into the tenant's timelines
|
||||
// map and remove the uninit mark file.
|
||||
let tl = self.finish_creation()?;
|
||||
tl.activate(broker_client, None, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
@@ -312,15 +303,6 @@ fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
/// Useful for initializing timelines, existing on disk after the restart.
|
||||
pub fn dummy() -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: true,
|
||||
uninit_mark_path: PathBuf::new(),
|
||||
timeline_path: PathBuf::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: false,
|
||||
@@ -514,7 +496,7 @@ impl Tenant {
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
first_save: bool,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant_id = self.tenant_id;
|
||||
|
||||
@@ -525,54 +507,39 @@ impl Tenant {
|
||||
.context("merge_local_remote_metadata")?
|
||||
.to_owned();
|
||||
|
||||
let timeline = {
|
||||
let timeline = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
up_to_date_metadata,
|
||||
ancestor.clone(),
|
||||
remote_client,
|
||||
init_order,
|
||||
)?;
|
||||
let new_disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
timeline
|
||||
.load_layer_map(new_disk_consistent_lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
{
|
||||
// avoiding holding it across awaits
|
||||
let mut timelines_accessor = self.timelines.lock().unwrap();
|
||||
if timelines_accessor.contains_key(&timeline_id) {
|
||||
anyhow::bail!(
|
||||
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
|
||||
);
|
||||
}
|
||||
|
||||
let dummy_timeline = self.create_timeline_data(
|
||||
timeline_id,
|
||||
up_to_date_metadata,
|
||||
ancestor.clone(),
|
||||
remote_client,
|
||||
init_order,
|
||||
)?;
|
||||
|
||||
let timeline = UninitializedTimeline {
|
||||
owning_tenant: self,
|
||||
timeline_id,
|
||||
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
|
||||
};
|
||||
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
|
||||
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
|
||||
// will ingest data which may require looking at the layers which are not yet available locally
|
||||
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
|
||||
Ok(new_timeline) => new_timeline,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
|
||||
// FIXME using None is a hack, it wont hurt, just ugly.
|
||||
// Ideally initialize_with_lock error should return timeline in the error
|
||||
// Or return ownership of itself completely so somethin like into_broken
|
||||
// can be called directly on Uninitielized timeline
|
||||
// also leades to redundant .clone
|
||||
let broken_timeline = self
|
||||
.create_timeline_data(
|
||||
timeline_id,
|
||||
up_to_date_metadata,
|
||||
ancestor.clone(),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!("creating broken timeline data for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
broken_timeline.set_broken(e.to_string());
|
||||
timelines_accessor.insert(timeline_id, broken_timeline);
|
||||
return Err(e);
|
||||
match timelines_accessor.entry(timeline_id) {
|
||||
Entry::Occupied(_) => {
|
||||
// The uninit mark file acts as a lock that prevents another task from
|
||||
// initializing the timeline at the same time.
|
||||
unreachable!(
|
||||
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
|
||||
);
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(&timeline));
|
||||
timeline.maybe_spawn_flush_loop();
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -590,16 +557,10 @@ impl Tenant {
|
||||
.context("failed to reconcile with remote")?
|
||||
}
|
||||
|
||||
let layers = timeline.layer_mgr.read();
|
||||
// Sanity check: a timeline should have some content.
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
ancestor.is_some() || layers.iter_historic_layers().next().is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
|
||||
@@ -1164,14 +1125,14 @@ impl Tenant {
|
||||
.init_upload_queue_stopped_to_continue_deletion(&index_part)?;
|
||||
|
||||
let timeline = self
|
||||
.create_timeline_data(
|
||||
.create_timeline_struct(
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
ancestor,
|
||||
Some(remote_client),
|
||||
init_order,
|
||||
)
|
||||
.context("create_timeline_data")?;
|
||||
.context("create_timeline_struct")?;
|
||||
|
||||
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
|
||||
|
||||
@@ -1280,7 +1241,7 @@ impl Tenant {
|
||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a writable timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub fn create_empty_timeline(
|
||||
pub async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1292,11 +1253,15 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
|
||||
drop(timelines);
|
||||
let timeline_uninit_mark = {
|
||||
let timelines: MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>> =
|
||||
self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
|
||||
};
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
// make it valid, before calling finish_creation()
|
||||
Lsn(0),
|
||||
None,
|
||||
None,
|
||||
@@ -1305,13 +1270,14 @@ impl Tenant {
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
);
|
||||
self.prepare_timeline(
|
||||
self.prepare_new_timeline(
|
||||
new_timeline_id,
|
||||
&new_metadata,
|
||||
timeline_uninit_mark,
|
||||
true,
|
||||
initdb_lsn,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create an emtpy timeline.
|
||||
@@ -1320,14 +1286,16 @@ impl Tenant {
|
||||
// This makes the various functions which anyhow::ensure! for Active state work in tests.
|
||||
// Our current tests don't need the background loops.
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
let uninit_tl = self
|
||||
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
let tline = uninit_tl.raw_timeline().expect("we just created it");
|
||||
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
|
||||
|
||||
@@ -1338,12 +1306,14 @@ impl Tenant {
|
||||
.context("init_empty_test_timeline")?;
|
||||
modification
|
||||
.commit()
|
||||
.await
|
||||
.context("commit init_empty_test_timeline modification")?;
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
|
||||
// and modification.init_empty() already created layers.
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
// Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
|
||||
tline.maybe_spawn_flush_loop();
|
||||
tline.freeze_and_flush().await.context("freeze_and_flush")?;
|
||||
|
||||
let tl = uninit_tl.finish_creation()?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -1589,7 +1559,7 @@ impl Tenant {
|
||||
// No timeout here, GC & Compaction should be responsive to the
|
||||
// `TimelineState::Stopping` change.
|
||||
info!("waiting for layer_removal_cs.lock()");
|
||||
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
|
||||
let layer_removal_guard = timeline.lcache.delete_guard().await;
|
||||
info!("got layer_removal_cs.lock(), deleting layer files");
|
||||
|
||||
// NB: storage_sync upload tasks that reference these layers have been cancelled
|
||||
@@ -2275,7 +2245,12 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_timeline_data(
|
||||
/// Helper function to create a new Timeline struct.
|
||||
///
|
||||
/// The returned Timeline is in Loading state. The caller is responsible for
|
||||
/// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
|
||||
/// map.
|
||||
fn create_timeline_struct(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
@@ -2695,7 +2670,7 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
@@ -2780,17 +2755,17 @@ impl Tenant {
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
self.prepare_timeline(
|
||||
let uninitialized_timeline = self
|
||||
.prepare_new_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_uninit_mark,
|
||||
false,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)?
|
||||
};
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_timeline = uninitialized_timeline.finish_creation()?;
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
|
||||
@@ -2866,8 +2841,15 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let raw_timeline =
|
||||
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
|
||||
let raw_timeline = self
|
||||
.prepare_new_timeline(
|
||||
timeline_id,
|
||||
&new_metadata,
|
||||
timeline_uninit_mark,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||
@@ -2883,10 +2865,10 @@ impl Tenant {
|
||||
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// Flush the new layer files to disk, before we mark the timeline as available to
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Thus spawn flush loop manually and skip flush_loop setup in initialize_with_lock
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
unfinished_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
@@ -2902,12 +2884,8 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
|
||||
};
|
||||
// All done!
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
|
||||
info!(
|
||||
"created root timeline {} timeline.lsn {}",
|
||||
@@ -2918,14 +2896,18 @@ impl Tenant {
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
/// Creates intermediate timeline structure and its files, without loading it into memory.
|
||||
/// It's up to the caller to import the necesary data and import the timeline into memory.
|
||||
fn prepare_timeline(
|
||||
/// Creates intermediate timeline structure and its files.
|
||||
///
|
||||
/// An empty layer map is initialized, and new data and WAL can be imported starting
|
||||
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
||||
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
||||
/// uninit mark file.
|
||||
async fn prepare_new_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
uninit_mark: TimelineUninitMark,
|
||||
init_layers: bool,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<UninitializedTimeline> {
|
||||
let tenant_id = self.tenant_id;
|
||||
@@ -2943,33 +2925,27 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
match self.create_timeline_files(
|
||||
&uninit_mark.timeline_path,
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
ancestor,
|
||||
remote_client,
|
||||
) {
|
||||
Ok(new_timeline) => {
|
||||
if init_layers {
|
||||
new_timeline.layers.write().unwrap().next_open_layer_at =
|
||||
Some(new_timeline.initdb_lsn);
|
||||
}
|
||||
debug!(
|
||||
"Successfully created initial files for timeline {tenant_id}/{new_timeline_id}"
|
||||
);
|
||||
Ok(UninitializedTimeline {
|
||||
owning_tenant: self,
|
||||
timeline_id: new_timeline_id,
|
||||
raw_timeline: Some((new_timeline, uninit_mark)),
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
Err(e)
|
||||
}
|
||||
let timeline_struct = self
|
||||
.create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
timeline_struct.init_empty_layer_map(start_lsn).await?;
|
||||
|
||||
if let Err(e) =
|
||||
self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata)
|
||||
{
|
||||
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
|
||||
|
||||
Ok(UninitializedTimeline {
|
||||
owning_tenant: self,
|
||||
timeline_id: new_timeline_id,
|
||||
raw_timeline: Some((timeline_struct, uninit_mark)),
|
||||
})
|
||||
}
|
||||
|
||||
fn create_timeline_files(
|
||||
@@ -2977,13 +2953,8 @@ impl Tenant {
|
||||
timeline_path: &Path,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_data = self
|
||||
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?;
|
||||
) -> anyhow::Result<()> {
|
||||
crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
|
||||
|
||||
fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
|
||||
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
|
||||
@@ -2997,8 +2968,7 @@ impl Tenant {
|
||||
true,
|
||||
)
|
||||
.context("Failed to create timeline metadata")?;
|
||||
|
||||
Ok(timeline_data)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempts to create an uninit mark file for the timeline initialization.
|
||||
@@ -3613,16 +3583,21 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -3647,9 +3622,14 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3678,9 +3658,10 @@ mod tests {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let writer = tline.writer().await;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
|
||||
@@ -3688,13 +3669,21 @@ mod tests {
|
||||
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
|
||||
|
||||
// Insert a value on the timeline
|
||||
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
|
||||
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
|
||||
.await?;
|
||||
writer
|
||||
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
|
||||
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
|
||||
//assert_current_logical_size(&tline, Lsn(0x40));
|
||||
@@ -3706,8 +3695,10 @@ mod tests {
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer();
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
|
||||
let new_writer = newtline.writer().await;
|
||||
new_writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
|
||||
.await?;
|
||||
new_writer.finish_write(Lsn(0x40));
|
||||
|
||||
// Check page contents on both branches
|
||||
@@ -3733,38 +3724,46 @@ mod tests {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
let writer = tline.writer();
|
||||
let writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
}
|
||||
tline.freeze_and_flush().await?;
|
||||
{
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
}
|
||||
tline.freeze_and_flush().await
|
||||
@@ -3776,8 +3775,9 @@ mod tests {
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3814,8 +3814,9 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
|
||||
match tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
@@ -3864,8 +3865,9 @@ mod tests {
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3913,8 +3915,9 @@ mod tests {
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3937,8 +3940,9 @@ mod tests {
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3970,8 +3974,9 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
@@ -3990,8 +3995,9 @@ mod tests {
|
||||
// create two timelines
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -4028,8 +4034,9 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -4067,35 +4074,44 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
drop(writer);
|
||||
|
||||
@@ -4133,8 +4149,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -4145,12 +4162,14 @@ mod tests {
|
||||
for _ in 0..50 {
|
||||
for _ in 0..10000 {
|
||||
test_key.field6 = blknum;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -4176,8 +4195,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4194,12 +4214,14 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -4212,12 +4234,14 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
updated[blknum] = lsn;
|
||||
@@ -4250,8 +4274,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4268,12 +4293,14 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -4294,12 +4321,14 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating {} at {}", blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -4333,8 +4362,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4359,12 +4389,14 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating [{}][{}] at {}", idx, blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -4397,8 +4429,9 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let initdb_lsn = Lsn(0x20);
|
||||
let utline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let utline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = utline.raw_timeline().unwrap();
|
||||
|
||||
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
|
||||
@@ -4435,6 +4468,7 @@ mod tests {
|
||||
.context("init_empty_test_timeline")?;
|
||||
modification
|
||||
.commit()
|
||||
.await
|
||||
.context("commit init_empty_test_timeline modification")?;
|
||||
|
||||
// Do the flush. The flush code will check the expectations that we set above.
|
||||
|
||||
143
pageserver/src/tenant/layer_cache.rs
Normal file
143
pageserver/src/tenant/layer_cache.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
|
||||
use super::Timeline;
|
||||
use crate::tenant::layer_map::{self, LayerMap};
|
||||
use anyhow::Result;
|
||||
use std::sync::{Mutex, Weak};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub struct LayerCache {
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
|
||||
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
|
||||
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
|
||||
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
|
||||
|
||||
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
|
||||
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
|
||||
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
|
||||
|
||||
/// Will be useful when we move evict / download to layer cache.
|
||||
#[allow(unused)]
|
||||
timeline: Weak<Timeline>,
|
||||
|
||||
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
|
||||
|
||||
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new(timeline: Weak<Timeline>) -> Self {
|
||||
Self {
|
||||
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
|
||||
mapping: Mutex::new(HashMap::new()),
|
||||
timeline,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
let guard = self.mapping.lock().unwrap();
|
||||
guard.get(&desc.key()).expect("not found").clone()
|
||||
}
|
||||
|
||||
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
|
||||
/// we won't delete files that are being read.
|
||||
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
|
||||
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
|
||||
}
|
||||
|
||||
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
|
||||
/// we won't delete files that are being read.
|
||||
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
|
||||
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
|
||||
}
|
||||
|
||||
/// Ensures only one of compaction / gc can happen at a time.
|
||||
pub async fn delete_guard(&self) -> DeleteGuard {
|
||||
DeleteGuard(Arc::new(
|
||||
self.layers_removal_lock.clone().lock_owned().await,
|
||||
))
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within read path.
|
||||
pub fn replace_and_verify(
|
||||
&self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
|
||||
use super::layer_map::LayerKey;
|
||||
let key = LayerKey::from(&*expected);
|
||||
let other = LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
|
||||
anyhow::ensure!(
|
||||
layer_map::compare_arced_layers(&expected, layer),
|
||||
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called within write path. When compaction and image layer creation we will create new layers.
|
||||
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
|
||||
/// Will move logic to delete files here later.
|
||||
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
}
|
||||
@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
pub use historic_layer_coverage::{LayerKey, Replacement};
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
use super::storage_layer::PersistentLayerKey;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
pub struct LayerMap<L: ?Sized> {
|
||||
#[derive(Default, Clone)]
|
||||
pub struct LayerMap {
|
||||
//
|
||||
// 'open_layer' holds the current InMemoryLayer that is accepting new
|
||||
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
|
||||
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
|
||||
|
||||
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
|
||||
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
|
||||
/// RemoteLayer will be removed.
|
||||
mapping: HashMap<PersistentLayerKey, Arc<L>>,
|
||||
}
|
||||
|
||||
impl<L: ?Sized> Default for LayerMap<L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
open_layer: None,
|
||||
next_open_layer_at: None,
|
||||
frozen_layers: VecDeque::default(),
|
||||
l0_delta_layers: Vec::default(),
|
||||
historic: BufferedHistoricLayerCoverage::default(),
|
||||
mapping: HashMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The primary update API for the layer map.
|
||||
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
|
||||
/// Batching historic layer insertions and removals is good for
|
||||
/// performance and this struct helps us do that correctly.
|
||||
#[must_use]
|
||||
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
|
||||
pub struct BatchedUpdates<'a> {
|
||||
// While we hold this exclusive reference to the layer map the type checker
|
||||
// will prevent us from accidentally reading any unflushed updates.
|
||||
layer_map: &'a mut LayerMap<L>,
|
||||
layer_map: &'a mut LayerMap,
|
||||
}
|
||||
|
||||
/// Provide ability to batch more updates while hiding the read
|
||||
/// API so we don't accidentally read without flushing.
|
||||
impl<L> BatchedUpdates<'_, L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl BatchedUpdates<'_> {
|
||||
///
|
||||
/// Insert an on-disk layer.
|
||||
///
|
||||
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
|
||||
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
self.layer_map.insert_historic_noflush(layer_desc, layer)
|
||||
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.insert_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -145,31 +122,8 @@ where
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc, layer)
|
||||
}
|
||||
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected_desc: PersistentLayerDesc,
|
||||
expected: &Arc<L>,
|
||||
new_desc: PersistentLayerDesc,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||
|
||||
self.layer_map
|
||||
.replace_historic_noflush(expected_desc, expected, new_desc, new)
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
@@ -185,25 +139,19 @@ where
|
||||
// than panic later or read without flushing.
|
||||
//
|
||||
// TODO maybe warn if flush hasn't explicitly been called
|
||||
impl<L> Drop for BatchedUpdates<'_, L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl Drop for BatchedUpdates<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.layer_map.flush_updates();
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult<L: ?Sized> {
|
||||
pub layer: Arc<L>,
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<PersistentLayerDesc>,
|
||||
pub lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
impl<L> LayerMap<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
/// 'key', with lsn.start < 'end_lsn'.
|
||||
@@ -235,7 +183,7 @@ where
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
@@ -244,7 +192,6 @@ where
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
let image = self.get_layer_from_mapping(&image.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
@@ -252,7 +199,6 @@ where
|
||||
}
|
||||
(Some(delta), None) => {
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
let delta = self.get_layer_from_mapping(&delta.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
@@ -263,7 +209,6 @@ where
|
||||
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
|
||||
let image_exact_match = img_lsn + 1 == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
let image = self.get_layer_from_mapping(&image.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
@@ -271,7 +216,6 @@ where
|
||||
} else {
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
let delta = self.get_layer_from_mapping(&delta.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
@@ -282,7 +226,7 @@ where
|
||||
}
|
||||
|
||||
/// Start a batch of updates, applied on drop
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
|
||||
BatchedUpdates { layer_map: self }
|
||||
}
|
||||
|
||||
@@ -292,48 +236,32 @@ where
|
||||
/// Helper function for BatchedUpdates::insert_historic
|
||||
///
|
||||
/// TODO(chi): remove L generic so that we do not need to pass layer object.
|
||||
pub(self) fn insert_historic_noflush(
|
||||
&mut self,
|
||||
layer_desc: PersistentLayerDesc,
|
||||
layer: Arc<L>,
|
||||
) {
|
||||
self.mapping.insert(layer_desc.key(), layer.clone());
|
||||
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
if Self::is_l0(&layer_desc) {
|
||||
self.l0_delta_layers.push(layer_desc.clone().into());
|
||||
}
|
||||
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
historic_layer_coverage::LayerKey::from(&layer_desc),
|
||||
layer_desc.into(),
|
||||
);
|
||||
}
|
||||
|
||||
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
|
||||
let layer = self
|
||||
.mapping
|
||||
.get(key)
|
||||
.with_context(|| format!("{key:?}"))
|
||||
.expect("inconsistent layer mapping");
|
||||
layer
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(&*layer));
|
||||
if Self::is_l0(&layer) {
|
||||
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
|
||||
let layer_key = layer_desc.key();
|
||||
if Self::is_l0(&layer_desc) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
|
||||
l0_delta_layers.retain(|other| {
|
||||
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
|
||||
});
|
||||
l0_delta_layers.retain(|other| other.key() != layer_key);
|
||||
self.l0_delta_layers = l0_delta_layers;
|
||||
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
|
||||
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
|
||||
@@ -344,69 +272,6 @@ where
|
||||
"failed to locate removed historic layer from l0_delta_layers"
|
||||
);
|
||||
}
|
||||
self.mapping.remove(&layer_desc.key());
|
||||
}
|
||||
|
||||
pub(self) fn replace_historic_noflush(
|
||||
&mut self,
|
||||
expected_desc: PersistentLayerDesc,
|
||||
expected: &Arc<L>,
|
||||
new_desc: PersistentLayerDesc,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = Self::is_l0(expected);
|
||||
let new_l0 = Self::is_l0(&new);
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
let l0_index = if expected_l0 {
|
||||
// find the index in case replace worked, we need to replace that as well
|
||||
let pos = self.l0_delta_layers.iter().position(|slot| {
|
||||
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
|
||||
});
|
||||
|
||||
if pos.is_none() {
|
||||
return Ok(Replacement::NotFound);
|
||||
}
|
||||
pos
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let new_desc = Arc::new(new_desc);
|
||||
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
|
||||
**existing == expected_desc
|
||||
});
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
self.mapping.remove(&expected_desc.key());
|
||||
self.mapping.insert(new_desc.key(), new);
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new_desc;
|
||||
}
|
||||
}
|
||||
|
||||
let replaced = match replaced {
|
||||
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
|
||||
Replacement::NotFound => Replacement::NotFound,
|
||||
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
|
||||
Replacement::Unexpected(x) => {
|
||||
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
|
||||
}
|
||||
};
|
||||
|
||||
Ok(replaced)
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
@@ -454,10 +319,8 @@ where
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
|
||||
self.historic
|
||||
.iter()
|
||||
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
///
|
||||
@@ -472,7 +335,7 @@ where
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(vec![]),
|
||||
@@ -482,36 +345,26 @@ where
|
||||
let end = key_range.end.to_i128();
|
||||
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
|
||||
let mut current_key = start;
|
||||
let mut current_val = version.image_coverage.query(start);
|
||||
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in version.image_coverage.range(start..end) {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
coverage.push((
|
||||
kr,
|
||||
current_val
|
||||
.take()
|
||||
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
|
||||
));
|
||||
coverage.push((kr, current_val.take()));
|
||||
current_key = change_key;
|
||||
current_val = change_val.clone();
|
||||
}
|
||||
|
||||
// Add the final interval
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((
|
||||
kr,
|
||||
current_val
|
||||
.take()
|
||||
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
|
||||
));
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
Ok(coverage)
|
||||
}
|
||||
|
||||
pub fn is_l0(layer: &L) -> bool {
|
||||
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
|
||||
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
}
|
||||
|
||||
@@ -537,7 +390,7 @@ where
|
||||
/// TODO The optimal number should probably be slightly higher than 1, but to
|
||||
/// implement that we need to plumb a lot more context into this function
|
||||
/// than just the current partition_range.
|
||||
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
|
||||
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
|
||||
// Case 1
|
||||
if !Self::is_l0(layer) {
|
||||
return true;
|
||||
@@ -595,9 +448,7 @@ where
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
if !kr.is_empty() {
|
||||
let base_count =
|
||||
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
|
||||
as usize;
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath =
|
||||
self.count_deltas(&kr, &lr, new_limit)?;
|
||||
@@ -620,9 +471,7 @@ where
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
|
||||
if !kr.is_empty() {
|
||||
let base_count =
|
||||
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
|
||||
as usize;
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
@@ -772,12 +621,8 @@ where
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
|
||||
Ok(self
|
||||
.l0_delta_layers
|
||||
.iter()
|
||||
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
|
||||
.collect())
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
|
||||
Ok(self.l0_delta_layers.to_vec())
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
@@ -802,97 +647,76 @@ where
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
|
||||
///
|
||||
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
|
||||
#[inline(always)]
|
||||
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
|
||||
// "dyn Trait" objects are "fat pointers" in that they have two components:
|
||||
// - pointer to the object
|
||||
// - pointer to the vtable
|
||||
//
|
||||
// rust does not provide a guarantee that these vtables are unique, but however
|
||||
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
|
||||
// pointer and the vtable need to be equal.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/issues/103763
|
||||
//
|
||||
// A future version of rust will most likely use this form below, where we cast each
|
||||
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
|
||||
// not affect the comparison.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/pull/106450
|
||||
let left = Arc::as_ptr(left) as *const ();
|
||||
let right = Arc::as_ptr(right) as *const ();
|
||||
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
|
||||
///
|
||||
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
|
||||
///
|
||||
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
|
||||
#[inline(always)]
|
||||
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
|
||||
// "dyn Trait" objects are "fat pointers" in that they have two components:
|
||||
// - pointer to the object
|
||||
// - pointer to the vtable
|
||||
//
|
||||
// rust does not provide a guarantee that these vtables are unique, but however
|
||||
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
|
||||
// pointer and the vtable need to be equal.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/issues/103763
|
||||
//
|
||||
// A future version of rust will most likely use this form below, where we cast each
|
||||
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
|
||||
// not affect the comparison.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/pull/106450
|
||||
let left = Arc::as_ptr(left) as *const ();
|
||||
let right = Arc::as_ptr(right) as *const ();
|
||||
|
||||
left == right
|
||||
}
|
||||
left == right
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{LayerMap, Replacement};
|
||||
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
|
||||
use super::LayerMap;
|
||||
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod l0_delta_layers_updated {
|
||||
|
||||
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn for_full_range_delta() {
|
||||
// l0_delta_layers are used by compaction, and should observe all buffered updates
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
|
||||
true
|
||||
)
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
|
||||
true
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_non_full_range_delta() {
|
||||
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
|
||||
// because not full range
|
||||
false
|
||||
)
|
||||
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
|
||||
// because not full range
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_image() {
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
// code only checks if it is a full range layer, doesn't care about images, which must
|
||||
// mean we should in practice never have full range images
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_missing_l0_is_notfound() {
|
||||
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
|
||||
// however only happen for precondition failures.
|
||||
|
||||
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
|
||||
let layer = LayerFileName::from_str(layer).unwrap();
|
||||
let layer = LayerDescriptor::from(layer);
|
||||
|
||||
// same skeletan construction; see scenario below
|
||||
let not_found = Arc::new(layer.clone());
|
||||
let new_version = Arc::new(layer);
|
||||
|
||||
let mut map = LayerMap::default();
|
||||
|
||||
let res = map.batch_update().replace_historic(
|
||||
not_found.get_persistent_layer_desc(),
|
||||
¬_found,
|
||||
new_version.get_persistent_layer_desc(),
|
||||
new_version,
|
||||
);
|
||||
|
||||
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
// code only checks if it is a full range layer, doesn't care about images, which must
|
||||
// mean we should in practice never have full range images
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
|
||||
@@ -906,46 +730,31 @@ mod tests {
|
||||
|
||||
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
|
||||
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
|
||||
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded));
|
||||
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
|
||||
|
||||
let expected_in_counts = (1, usize::from(expected_l0));
|
||||
|
||||
map.batch_update()
|
||||
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
|
||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||
|
||||
let replaced = map
|
||||
.batch_update()
|
||||
.replace_historic(
|
||||
remote.get_persistent_layer_desc(),
|
||||
&remote,
|
||||
downloaded.get_persistent_layer_desc(),
|
||||
downloaded.clone(),
|
||||
)
|
||||
.expect("name derived attributes are the same");
|
||||
assert!(
|
||||
matches!(replaced, Replacement::Replaced { .. }),
|
||||
"{replaced:?}"
|
||||
.insert_historic(remote.layer_desc().clone());
|
||||
assert_eq!(
|
||||
count_layer_in(&map, remote.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
|
||||
|
||||
map.batch_update()
|
||||
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
|
||||
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
|
||||
.remove_historic(downloaded.layer_desc().clone());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
}
|
||||
|
||||
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
|
||||
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
|
||||
let historic = map
|
||||
.iter_historic_layers()
|
||||
.filter(|x| LayerMap::compare_arced_layers(x, layer))
|
||||
.filter(|x| x.key() == layer.key())
|
||||
.count();
|
||||
let l0s = map
|
||||
.get_level0_deltas()
|
||||
.expect("why does this return a result");
|
||||
let l0 = l0s
|
||||
.iter()
|
||||
.filter(|x| LayerMap::compare_arced_layers(x, layer))
|
||||
.count();
|
||||
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
|
||||
|
||||
(historic, l0)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::ops::Range;
|
||||
|
||||
use tracing::info;
|
||||
|
||||
use crate::tenant::storage_layer::PersistentLayerDesc;
|
||||
|
||||
use super::layer_coverage::LayerCoverageTuple;
|
||||
|
||||
/// Layers in this module are identified and indexed by this data.
|
||||
@@ -53,11 +55,24 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&PersistentLayerDesc> for LayerKey {
|
||||
fn from(layer: &PersistentLayerDesc) -> Self {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
LayerKey {
|
||||
key: kr.start.to_i128()..kr.end.to_i128(),
|
||||
lsn: lr.start.0..lr.end.0,
|
||||
is_image: !layer.is_incremental(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Efficiently queryable layer coverage for each LSN.
|
||||
///
|
||||
/// Allows answering layer map queries very efficiently,
|
||||
/// but doesn't allow retroactive insertion, which is
|
||||
/// sometimes necessary. See BufferedHistoricLayerCoverage.
|
||||
#[derive(Clone)]
|
||||
pub struct HistoricLayerCoverage<Value> {
|
||||
/// The latest state
|
||||
head: LayerCoverageTuple<Value>,
|
||||
@@ -411,6 +426,7 @@ fn test_persistent_overlapping() {
|
||||
///
|
||||
/// See this for more on persistent and retroactive techniques:
|
||||
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
|
||||
#[derive(Clone)]
|
||||
pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
/// A persistent layer map that we rebuild when we need to retroactively update
|
||||
historic_coverage: HistoricLayerCoverage<Value>,
|
||||
@@ -467,6 +483,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
///
|
||||
/// Returns a `Replacement` value describing the outcome; only the case of
|
||||
/// `Replacement::Replaced` modifies the map and requires a rebuild.
|
||||
///
|
||||
/// This function is unlikely to be used in the future because LayerMap now only records the
|
||||
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
|
||||
/// added, and never replaced.
|
||||
#[allow(dead_code)]
|
||||
pub fn replace<F>(
|
||||
&mut self,
|
||||
layer_key: &LayerKey,
|
||||
|
||||
@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
|
||||
///
|
||||
/// NOTE The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverage<Value> {
|
||||
/// For every change in coverage (as we sweep the key space)
|
||||
/// we store (lsn.end, value).
|
||||
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Image and delta coverage at a specific LSN.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverageTuple<Value> {
|
||||
pub image_coverage: LayerCoverage<Value>,
|
||||
pub delta_coverage: LayerCoverage<Value>,
|
||||
|
||||
146
pageserver/src/tenant/layer_map_mgr.rs
Normal file
146
pageserver/src/tenant/layer_map_mgr.rs
Normal file
@@ -0,0 +1,146 @@
|
||||
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
|
||||
//!
|
||||
//! A common usage pattern is as follows:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! async fn compaction(&self) {
|
||||
//! // Get the current state.
|
||||
//! let state = self.layer_map_mgr.read();
|
||||
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
|
||||
//! // take a long time.
|
||||
//! let compaction_result = self.do_compaction(&state).await?;
|
||||
//! // Update the state.
|
||||
//! self.layer_map_mgr.update(|mut state| async move {
|
||||
//! // do updates to the state, return it.
|
||||
//! Ok(state)
|
||||
//! }).await?;
|
||||
//! }
|
||||
//! ```
|
||||
use anyhow::Result;
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::layer_map::LayerMap;
|
||||
|
||||
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
|
||||
/// layer map.
|
||||
pub struct LayerMapMgr {
|
||||
layer_map: ArcSwap<LayerMap>,
|
||||
state_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl LayerMapMgr {
|
||||
/// Get the current state of the layer map.
|
||||
pub fn read(&self) -> Arc<LayerMap> {
|
||||
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
|
||||
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
|
||||
// here.
|
||||
self.layer_map.load_full()
|
||||
}
|
||||
|
||||
/// Clone the layer map for modification.
|
||||
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
|
||||
(**self.layer_map.load()).clone()
|
||||
}
|
||||
|
||||
pub fn new(layer_map: LayerMap) -> Self {
|
||||
Self {
|
||||
layer_map: ArcSwap::new(Arc::new(layer_map)),
|
||||
state_lock: tokio::sync::Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the layer map.
|
||||
pub async fn update<O, F>(&self, operation: O) -> Result<()>
|
||||
where
|
||||
O: FnOnce(LayerMap) -> F,
|
||||
F: Future<Output = Result<LayerMap>>,
|
||||
{
|
||||
let state_lock = self.state_lock.lock().await;
|
||||
let state = self.clone_for_write(&state_lock);
|
||||
let new_state = operation(state).await?;
|
||||
self.layer_map.store(Arc::new(new_state));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_layer_map_manage() -> Result<()> {
|
||||
let mgr = LayerMapMgr::new(Default::default());
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(0)..Key::from_i128(1),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_1 = mgr.read();
|
||||
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(1)..Key::from_i128(2),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_2 = mgr.read();
|
||||
|
||||
// Modification should not be visible to the old reference.
|
||||
assert_eq!(
|
||||
ref_1
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
|
||||
|
||||
// Modification should be visible to the new reference.
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(1), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(1)..Key::from_i128(2)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1392,7 +1392,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(8),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
T: PartialOrd<T>,
|
||||
@@ -176,19 +174,9 @@ impl LayerAccessStats {
|
||||
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn for_loading_layer<L>(
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
) -> Self
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(
|
||||
layer_map_lock_held_witness,
|
||||
status,
|
||||
LayerResidenceEventReason::LayerLoad,
|
||||
);
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
}
|
||||
|
||||
@@ -197,24 +185,16 @@ impl LayerAccessStats {
|
||||
/// The `new_status` is not recorded in `self`.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn clone_for_residence_change<L>(
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) -> LayerAccessStats {
|
||||
let clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(
|
||||
layer_map_lock_held_witness,
|
||||
new_status,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new
|
||||
}
|
||||
|
||||
@@ -232,14 +212,11 @@ impl LayerAccessStats {
|
||||
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
|
||||
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
|
||||
///
|
||||
pub(crate) fn record_residence_event<L>(
|
||||
pub(crate) fn record_residence_event(
|
||||
&self,
|
||||
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) {
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
locked.iter_mut().for_each(|inner| {
|
||||
inner
|
||||
@@ -389,10 +366,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
}
|
||||
|
||||
/// Returned by [`Layer::iter`]
|
||||
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i>;
|
||||
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i + Send>;
|
||||
|
||||
/// Returned by [`Layer::key_iter`]
|
||||
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i>;
|
||||
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
@@ -473,94 +450,125 @@ pub fn downcast_remote_layer(
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
pub key: Range<Key>,
|
||||
pub lsn: Range<Lsn>,
|
||||
pub is_incremental: bool,
|
||||
pub short_id: String,
|
||||
}
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
|
||||
impl LayerDescriptor {
|
||||
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
|
||||
/// and the tenant / timeline id does not matter.
|
||||
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
|
||||
PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
self.key.clone(),
|
||||
self.lsn.clone(),
|
||||
233,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key.clone()
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
base: PersistentLayerDesc,
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn.clone()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.is_incremental
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
self.short_id.clone()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaFileName> for LayerDescriptor {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
let short_id = value.to_string();
|
||||
LayerDescriptor {
|
||||
key: value.key_range,
|
||||
lsn: value.lsn_range,
|
||||
is_incremental: true,
|
||||
short_id,
|
||||
impl From<PersistentLayerDesc> for LayerDescriptor {
|
||||
fn from(base: PersistentLayerDesc) -> Self {
|
||||
Self { base }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerDescriptor {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
let short_id = value.to_string();
|
||||
let lsn = value.lsn_as_range();
|
||||
LayerDescriptor {
|
||||
key: value.key_range,
|
||||
lsn,
|
||||
is_incremental: false,
|
||||
short_id,
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.layer_desc().key_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.layer_desc().lsn_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerFileName> for LayerDescriptor {
|
||||
fn from(value: LayerFileName) -> Self {
|
||||
match value {
|
||||
LayerFileName::Delta(d) => Self::from(d),
|
||||
LayerFileName::Image(i) => Self::from(i),
|
||||
impl PersistentLayer for LayerDescriptor {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.base
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaFileName> for LayerDescriptor {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerDescriptor {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_img(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
false,
|
||||
233,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerFileName> for LayerDescriptor {
|
||||
fn from(value: LayerFileName) -> Self {
|
||||
match value {
|
||||
LayerFileName::Delta(d) => Self::from(d),
|
||||
LayerFileName::Image(i) => Self::from(i),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -46,7 +47,6 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -184,7 +184,7 @@ pub struct DeltaLayer {
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: RwLock<DeltaLayerInner>,
|
||||
inner: OnceCell<DeltaLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayer {
|
||||
@@ -201,21 +201,17 @@ impl std::fmt::Debug for DeltaLayer {
|
||||
}
|
||||
|
||||
pub struct DeltaLayerInner {
|
||||
/// If false, the fields below have not been loaded into memory yet.
|
||||
loaded: bool,
|
||||
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
/// Reader object for reading blocks from the file. (None if not loaded yet)
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
.field("loaded", &self.loaded)
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.finish()
|
||||
@@ -246,7 +242,7 @@ impl Layer for DeltaLayer {
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -315,7 +311,7 @@ impl Layer for DeltaLayer {
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -500,51 +496,22 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
// Need to open the file and load the metadata. Upgrade our lock to
|
||||
// a write lock. (Or rather, release and re-lock in write mode.)
|
||||
drop(inner);
|
||||
let inner = self.inner.write().unwrap();
|
||||
if !inner.loaded {
|
||||
self.load_inner(inner).with_context(|| {
|
||||
format!("Failed to load delta layer {}", self.path().display())
|
||||
})?;
|
||||
} else {
|
||||
// Another thread loaded it while we were not holding the lock.
|
||||
}
|
||||
|
||||
// We now have the file open and loaded. There's no function to do
|
||||
// that in the std library RwLock, so we have to release and re-lock
|
||||
// in read mode. (To be precise, the lock guard was moved in the
|
||||
// above call to `load_inner`, so it's already been released). And
|
||||
// while we do that, another thread could unload again, so we have
|
||||
// to re-check and retry if that happens.
|
||||
}
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
|
||||
}
|
||||
|
||||
fn load_inner(&self, mut inner: RwLockWriteGuard<DeltaLayerInner>) -> Result<()> {
|
||||
fn load_inner(&self) -> Result<DeltaLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
// Open the file if it's not open already.
|
||||
if inner.file.is_none() {
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
inner.file = Some(FileBlockReader::new(file));
|
||||
}
|
||||
let file = inner.file.as_mut().unwrap();
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
@@ -571,13 +538,13 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
inner.index_start_blk = actual_summary.index_start_blk;
|
||||
inner.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
inner.loaded = true;
|
||||
Ok(())
|
||||
Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
@@ -599,12 +566,7 @@ impl DeltaLayer {
|
||||
file_size,
|
||||
),
|
||||
access_stats,
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,12 +593,7 @@ impl DeltaLayer {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -800,12 +757,7 @@ impl DeltaLayerWriterInner {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
@@ -940,13 +892,13 @@ struct DeltaValueIter<'a> {
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
struct Adapter<'a>(&'a DeltaLayerInner);
|
||||
|
||||
impl<'a> BlockReader for Adapter<'a> {
|
||||
type BlockLease = PageReadGuard<'static>;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
|
||||
self.0.file.as_ref().unwrap().read_blk(blknum)
|
||||
self.0.file.read_blk(blknum)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -959,8 +911,8 @@ impl<'a> Iterator for DeltaValueIter<'a> {
|
||||
}
|
||||
|
||||
impl<'a> DeltaValueIter<'a> {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1033,8 +985,8 @@ impl Iterator for DeltaKeyIter {
|
||||
}
|
||||
|
||||
impl<'a> DeltaKeyIter {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1074,3 +1026,21 @@ impl<'a> DeltaKeyIter {
|
||||
Ok(iter)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::DeltaKeyIter;
|
||||
use super::DeltaLayer;
|
||||
use super::DeltaValueIter;
|
||||
|
||||
// We will soon need the iters to be send in the compaction code.
|
||||
// Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883
|
||||
// Cf https://github.com/neondatabase/neon/issues/4471
|
||||
#[test]
|
||||
fn is_send() {
|
||||
fn assert_send<T: Send>() {}
|
||||
assert_send::<DeltaLayer>();
|
||||
assert_send::<DeltaValueIter>();
|
||||
assert_send::<DeltaKeyIter>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use anyhow::{bail, Result};
|
||||
@@ -218,15 +217,11 @@ impl RemoteLayer {
|
||||
}
|
||||
|
||||
/// Create a Layer struct representing this layer, after it has been downloaded.
|
||||
pub fn create_downloaded_layer<L>(
|
||||
pub fn create_downloaded_layer(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
conf: &'static PageServerConf,
|
||||
file_size: u64,
|
||||
) -> Arc<dyn PersistentLayer>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) -> Arc<dyn PersistentLayer> {
|
||||
if self.desc.is_delta {
|
||||
let fname = self.desc.delta_file_name();
|
||||
Arc::new(DeltaLayer::new(
|
||||
@@ -235,10 +230,8 @@ impl RemoteLayer {
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
} else {
|
||||
let fname = self.desc.image_file_name();
|
||||
@@ -248,10 +241,8 @@ impl RemoteLayer {
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -197,9 +197,11 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let guard = self.lcache.layer_in_use_read().await;
|
||||
let layers = self.layer_mgr.read();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = self.lcache.get_from_desc(&hist_layer);
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1325,6 +1325,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -304,12 +304,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline.check_checkpoint_distance().with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
|
||||
@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1199,7 +1199,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1208,7 +1208,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1216,22 +1218,22 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1317,7 +1319,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1359,7 +1361,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
@@ -1372,7 +1374,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
@@ -1397,7 +1399,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
@@ -1427,14 +1429,16 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1453,7 +1457,7 @@ mod tests {
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
@@ -1471,7 +1475,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1496,7 +1500,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1508,7 +1514,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1553,7 +1559,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1602,7 +1608,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1636,7 +1642,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -1647,7 +1655,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1663,7 +1671,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
@@ -1676,7 +1684,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
@@ -1692,7 +1700,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
|
||||
124
poetry.lock
generated
124
poetry.lock
generated
@@ -1,9 +1,10 @@
|
||||
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
version = "3.7.4"
|
||||
description = "Async http client/server framework (asyncio)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -61,6 +62,7 @@ speedups = ["aiodns", "brotlipy", "cchardet"]
|
||||
name = "aiopg"
|
||||
version = "1.3.4"
|
||||
description = "Postgres integration with asyncio."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -77,28 +79,30 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
|
||||
|
||||
[[package]]
|
||||
name = "allure-pytest"
|
||||
version = "2.13.1"
|
||||
version = "2.13.2"
|
||||
description = "Allure pytest integration"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "allure-pytest-2.13.1.tar.gz", hash = "sha256:68d69456eeb65af4061ec06a80bc941163b0616e8216554d36b070a6bf070e08"},
|
||||
{file = "allure_pytest-2.13.1-py3-none-any.whl", hash = "sha256:a8de2fc3b3effe2d8f98801646920de3f055b779710f4c806dbee7c613c24633"},
|
||||
{file = "allure-pytest-2.13.2.tar.gz", hash = "sha256:22243159e8ec81ce2b5254b4013802198821b1b42f118f69d4a289396607c7b3"},
|
||||
{file = "allure_pytest-2.13.2-py3-none-any.whl", hash = "sha256:17de9dbee7f61c8e66a5b5e818b00e419dbcea44cb55c24319401ba813220690"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
allure-python-commons = "2.13.1"
|
||||
allure-python-commons = "2.13.2"
|
||||
pytest = ">=4.5.0"
|
||||
|
||||
[[package]]
|
||||
name = "allure-python-commons"
|
||||
version = "2.13.1"
|
||||
version = "2.13.2"
|
||||
description = "Common module for integrate allure with python-based frameworks"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "allure-python-commons-2.13.1.tar.gz", hash = "sha256:3fc13e1da8ebb23f9ab5c9c72ad04595023cdd5078dbb8604939997faebed5cb"},
|
||||
{file = "allure_python_commons-2.13.1-py3-none-any.whl", hash = "sha256:d08e04867bddf44fef55def3d67f4bc25af58a1bf9fcffcf4ec3331f7f2ef0d0"},
|
||||
{file = "allure-python-commons-2.13.2.tar.gz", hash = "sha256:8a03681330231b1deadd86b97ff68841c6591320114ae638570f1ed60d7a2033"},
|
||||
{file = "allure_python_commons-2.13.2-py3-none-any.whl", hash = "sha256:2bb3646ec3fbf5b36d178a5e735002bc130ae9f9ba80f080af97d368ba375051"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -109,6 +113,7 @@ pluggy = ">=0.4.0"
|
||||
name = "async-timeout"
|
||||
version = "3.0.1"
|
||||
description = "Timeout context manager for asyncio programs"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5.3"
|
||||
files = [
|
||||
@@ -120,6 +125,7 @@ files = [
|
||||
name = "asyncpg"
|
||||
version = "0.27.0"
|
||||
description = "An asyncio PostgreSQL driver"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7.0"
|
||||
files = [
|
||||
@@ -170,6 +176,7 @@ test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"]
|
||||
name = "attrs"
|
||||
version = "21.4.0"
|
||||
description = "Classes Without Boilerplate"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
@@ -187,6 +194,7 @@ tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy"
|
||||
name = "aws-sam-translator"
|
||||
version = "1.48.0"
|
||||
description = "AWS SAM Translator is a library that transform SAM templates into AWS CloudFormation templates"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7, <=4.0, !=4.0"
|
||||
files = [
|
||||
@@ -196,7 +204,7 @@ files = [
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
boto3 = ">=1.19.5,<2.dev0"
|
||||
boto3 = ">=1.19.5,<2.0.0"
|
||||
jsonschema = ">=3.2,<4.0"
|
||||
|
||||
[package.extras]
|
||||
@@ -206,6 +214,7 @@ dev = ["black (==20.8b1)", "boto3 (>=1.23,<2)", "click (>=7.1,<8.0)", "coverage
|
||||
name = "aws-xray-sdk"
|
||||
version = "2.10.0"
|
||||
description = "The AWS X-Ray SDK for Python (the SDK) enables Python developers to record and emit information from within their applications to the AWS X-Ray service."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -221,6 +230,7 @@ wrapt = "*"
|
||||
name = "backoff"
|
||||
version = "2.2.1"
|
||||
description = "Function decoration for backoff and retry"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7,<4.0"
|
||||
files = [
|
||||
@@ -232,6 +242,7 @@ files = [
|
||||
name = "black"
|
||||
version = "23.3.0"
|
||||
description = "The uncompromising code formatter."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -281,6 +292,7 @@ uvloop = ["uvloop (>=0.15.2)"]
|
||||
name = "boto3"
|
||||
version = "1.26.16"
|
||||
description = "The AWS SDK for Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">= 3.7"
|
||||
files = [
|
||||
@@ -300,6 +312,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
|
||||
name = "boto3-stubs"
|
||||
version = "1.26.16"
|
||||
description = "Type annotations for boto3 1.26.16 generated with mypy-boto3-builder 7.11.11"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -644,6 +657,7 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
|
||||
name = "botocore"
|
||||
version = "1.29.16"
|
||||
description = "Low-level, data-driven core of boto 3."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">= 3.7"
|
||||
files = [
|
||||
@@ -663,6 +677,7 @@ crt = ["awscrt (==0.14.0)"]
|
||||
name = "botocore-stubs"
|
||||
version = "1.27.38"
|
||||
description = "Type annotations for botocore 1.27.38 generated with mypy-boto3-builder 7.10.1"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -677,6 +692,7 @@ typing-extensions = ">=4.1.0"
|
||||
name = "certifi"
|
||||
version = "2022.12.7"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -688,6 +704,7 @@ files = [
|
||||
name = "cffi"
|
||||
version = "1.15.1"
|
||||
description = "Foreign Function Interface for Python calling C code."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -764,6 +781,7 @@ pycparser = "*"
|
||||
name = "cfn-lint"
|
||||
version = "0.61.3"
|
||||
description = "Checks CloudFormation templates for practices and behaviour that could potentially be improved"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6, <=4.0, !=4.0"
|
||||
files = [
|
||||
@@ -785,6 +803,7 @@ sarif-om = ">=1.0.4,<1.1.0"
|
||||
name = "chardet"
|
||||
version = "3.0.4"
|
||||
description = "Universal encoding detector for Python 2 and 3"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -796,6 +815,7 @@ files = [
|
||||
name = "charset-normalizer"
|
||||
version = "2.1.0"
|
||||
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6.0"
|
||||
files = [
|
||||
@@ -810,6 +830,7 @@ unicode-backport = ["unicodedata2"]
|
||||
name = "click"
|
||||
version = "8.1.3"
|
||||
description = "Composable command line interface toolkit"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -824,6 +845,7 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
|
||||
name = "colorama"
|
||||
version = "0.4.5"
|
||||
description = "Cross-platform colored terminal text."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
@@ -835,6 +857,7 @@ files = [
|
||||
name = "cryptography"
|
||||
version = "41.0.0"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -876,6 +899,7 @@ test-randomorder = ["pytest-randomly"]
|
||||
name = "docker"
|
||||
version = "4.2.2"
|
||||
description = "A Python library for the Docker Engine API."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
@@ -897,6 +921,7 @@ tls = ["cryptography (>=1.3.4)", "idna (>=2.0.0)", "pyOpenSSL (>=17.5.0)"]
|
||||
name = "ecdsa"
|
||||
version = "0.18.0"
|
||||
description = "ECDSA cryptographic signature library (pure python)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
|
||||
files = [
|
||||
@@ -915,6 +940,7 @@ gmpy2 = ["gmpy2"]
|
||||
name = "exceptiongroup"
|
||||
version = "1.1.1"
|
||||
description = "Backport of PEP 654 (exception groups)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -929,6 +955,7 @@ test = ["pytest (>=6)"]
|
||||
name = "execnet"
|
||||
version = "1.9.0"
|
||||
description = "execnet: rapid multi-Python deployment"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
@@ -943,6 +970,7 @@ testing = ["pre-commit"]
|
||||
name = "flask"
|
||||
version = "2.2.5"
|
||||
description = "A simple framework for building complex web applications."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -965,6 +993,7 @@ dotenv = ["python-dotenv"]
|
||||
name = "flask-cors"
|
||||
version = "3.0.10"
|
||||
description = "A Flask extension adding a decorator for CORS support"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -980,6 +1009,7 @@ Six = "*"
|
||||
name = "graphql-core"
|
||||
version = "3.2.1"
|
||||
description = "GraphQL implementation for Python, a port of GraphQL.js, the JavaScript reference implementation for GraphQL."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6,<4"
|
||||
files = [
|
||||
@@ -991,6 +1021,7 @@ files = [
|
||||
name = "idna"
|
||||
version = "3.3"
|
||||
description = "Internationalized Domain Names in Applications (IDNA)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
@@ -1002,6 +1033,7 @@ files = [
|
||||
name = "importlib-metadata"
|
||||
version = "4.12.0"
|
||||
description = "Read metadata from Python packages"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1021,6 +1053,7 @@ testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs
|
||||
name = "iniconfig"
|
||||
version = "1.1.1"
|
||||
description = "iniconfig: brain-dead simple config-ini parsing"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1032,6 +1065,7 @@ files = [
|
||||
name = "itsdangerous"
|
||||
version = "2.1.2"
|
||||
description = "Safely pass data to untrusted environments and back."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1043,6 +1077,7 @@ files = [
|
||||
name = "jinja2"
|
||||
version = "3.1.2"
|
||||
description = "A very fast and expressive template engine."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1060,6 +1095,7 @@ i18n = ["Babel (>=2.7)"]
|
||||
name = "jmespath"
|
||||
version = "1.0.1"
|
||||
description = "JSON Matching Expressions"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1071,6 +1107,7 @@ files = [
|
||||
name = "jschema-to-python"
|
||||
version = "1.2.3"
|
||||
description = "Generate source code for Python classes from a JSON schema."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">= 2.7"
|
||||
files = [
|
||||
@@ -1087,6 +1124,7 @@ pbr = "*"
|
||||
name = "jsondiff"
|
||||
version = "2.0.0"
|
||||
description = "Diff JSON and JSON-like structures in Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1098,6 +1136,7 @@ files = [
|
||||
name = "jsonpatch"
|
||||
version = "1.32"
|
||||
description = "Apply JSON-Patches (RFC 6902)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
files = [
|
||||
@@ -1112,6 +1151,7 @@ jsonpointer = ">=1.9"
|
||||
name = "jsonpickle"
|
||||
version = "2.2.0"
|
||||
description = "Python library for serializing any arbitrary object graph into JSON"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7"
|
||||
files = [
|
||||
@@ -1128,6 +1168,7 @@ testing-libs = ["simplejson", "ujson", "yajl"]
|
||||
name = "jsonpointer"
|
||||
version = "2.3"
|
||||
description = "Identify specific nodes in a JSON document (RFC 6901)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
|
||||
files = [
|
||||
@@ -1139,6 +1180,7 @@ files = [
|
||||
name = "jsonschema"
|
||||
version = "3.2.0"
|
||||
description = "An implementation of JSON Schema validation for Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1160,6 +1202,7 @@ format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-va
|
||||
name = "junit-xml"
|
||||
version = "1.9"
|
||||
description = "Creates JUnit XML test result documents that can be read by tools such as Jenkins"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1174,6 +1217,7 @@ six = "*"
|
||||
name = "markupsafe"
|
||||
version = "2.1.1"
|
||||
description = "Safely add untrusted strings to HTML/XML markup."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1223,6 +1267,7 @@ files = [
|
||||
name = "moto"
|
||||
version = "4.1.2"
|
||||
description = ""
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1283,6 +1328,7 @@ xray = ["aws-xray-sdk (>=0.93,!=0.96)", "setuptools"]
|
||||
name = "multidict"
|
||||
version = "6.0.4"
|
||||
description = "multidict implementation"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1366,6 +1412,7 @@ files = [
|
||||
name = "mypy"
|
||||
version = "1.3.0"
|
||||
description = "Optional static typing for Python"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1412,6 +1459,7 @@ reports = ["lxml"]
|
||||
name = "mypy-boto3-s3"
|
||||
version = "1.26.0.post1"
|
||||
description = "Type annotations for boto3.S3 1.26.0 service generated with mypy-boto3-builder 7.11.10"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1426,6 +1474,7 @@ typing-extensions = ">=4.1.0"
|
||||
name = "mypy-extensions"
|
||||
version = "1.0.0"
|
||||
description = "Type system extensions for programs checked with the mypy type checker."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
@@ -1437,6 +1486,7 @@ files = [
|
||||
name = "networkx"
|
||||
version = "2.8.5"
|
||||
description = "Python package for creating and manipulating graphs and networks"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
@@ -1455,6 +1505,7 @@ test = ["codecov (>=2.1)", "pytest (>=7.1)", "pytest-cov (>=3.0)"]
|
||||
name = "openapi-schema-validator"
|
||||
version = "0.2.3"
|
||||
description = "OpenAPI schema validation for Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7.0,<4.0.0"
|
||||
files = [
|
||||
@@ -1474,6 +1525,7 @@ strict-rfc3339 = ["strict-rfc3339"]
|
||||
name = "openapi-spec-validator"
|
||||
version = "0.4.0"
|
||||
description = "OpenAPI 2.0 (aka Swagger) and OpenAPI 3.0 spec validator"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7.0,<4.0.0"
|
||||
files = [
|
||||
@@ -1494,6 +1546,7 @@ requests = ["requests"]
|
||||
name = "packaging"
|
||||
version = "23.0"
|
||||
description = "Core utilities for Python packages"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1505,6 +1558,7 @@ files = [
|
||||
name = "pathspec"
|
||||
version = "0.9.0"
|
||||
description = "Utility library for gitignore style pattern matching of file paths."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
|
||||
files = [
|
||||
@@ -1516,6 +1570,7 @@ files = [
|
||||
name = "pbr"
|
||||
version = "5.9.0"
|
||||
description = "Python Build Reasonableness"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.6"
|
||||
files = [
|
||||
@@ -1527,6 +1582,7 @@ files = [
|
||||
name = "platformdirs"
|
||||
version = "2.5.2"
|
||||
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1542,6 +1598,7 @@ test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock
|
||||
name = "pluggy"
|
||||
version = "1.0.0"
|
||||
description = "plugin and hook calling mechanisms for python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1557,6 +1614,7 @@ testing = ["pytest", "pytest-benchmark"]
|
||||
name = "prometheus-client"
|
||||
version = "0.14.1"
|
||||
description = "Python client for the Prometheus monitoring system."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1571,6 +1629,7 @@ twisted = ["twisted"]
|
||||
name = "psutil"
|
||||
version = "5.9.4"
|
||||
description = "Cross-platform lib for process and system monitoring in Python."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
|
||||
files = [
|
||||
@@ -1597,6 +1656,7 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
|
||||
name = "psycopg2-binary"
|
||||
version = "2.9.3"
|
||||
description = "psycopg2 - Python-PostgreSQL Database Adapter"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1665,6 +1725,7 @@ files = [
|
||||
name = "pyasn1"
|
||||
version = "0.4.8"
|
||||
description = "ASN.1 types and codecs"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1676,6 +1737,7 @@ files = [
|
||||
name = "pycparser"
|
||||
version = "2.21"
|
||||
description = "C parser in Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
|
||||
files = [
|
||||
@@ -1687,6 +1749,7 @@ files = [
|
||||
name = "pyjwt"
|
||||
version = "2.4.0"
|
||||
description = "JSON Web Token implementation in Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1707,6 +1770,7 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"]
|
||||
name = "pyparsing"
|
||||
version = "3.0.9"
|
||||
description = "pyparsing module - Classes and methods to define and execute parsing grammars"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6.8"
|
||||
files = [
|
||||
@@ -1721,6 +1785,7 @@ diagrams = ["jinja2", "railroad-diagrams"]
|
||||
name = "pypiwin32"
|
||||
version = "223"
|
||||
description = ""
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1735,6 +1800,7 @@ pywin32 = ">=223"
|
||||
name = "pyrsistent"
|
||||
version = "0.18.1"
|
||||
description = "Persistent/Functional/Immutable data structures"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1765,6 +1831,7 @@ files = [
|
||||
name = "pytest"
|
||||
version = "7.3.1"
|
||||
description = "pytest: simple powerful testing with Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1787,6 +1854,7 @@ testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "no
|
||||
name = "pytest-asyncio"
|
||||
version = "0.21.0"
|
||||
description = "Pytest support for asyncio"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1805,6 +1873,7 @@ testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy
|
||||
name = "pytest-httpserver"
|
||||
version = "1.0.8"
|
||||
description = "pytest-httpserver is a httpserver for pytest"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8,<4.0"
|
||||
files = [
|
||||
@@ -1819,6 +1888,7 @@ Werkzeug = ">=2.0.0"
|
||||
name = "pytest-lazy-fixture"
|
||||
version = "0.6.3"
|
||||
description = "It helps to use fixtures in pytest.mark.parametrize"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1833,6 +1903,7 @@ pytest = ">=3.2.5"
|
||||
name = "pytest-order"
|
||||
version = "1.1.0"
|
||||
description = "pytest plugin to run your tests in a specific order"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1850,6 +1921,7 @@ pytest = [
|
||||
name = "pytest-rerunfailures"
|
||||
version = "11.1.2"
|
||||
description = "pytest plugin to re-run tests to eliminate flaky failures"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1865,6 +1937,7 @@ pytest = ">=5.3"
|
||||
name = "pytest-timeout"
|
||||
version = "2.1.0"
|
||||
description = "pytest plugin to abort hanging tests"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -1879,6 +1952,7 @@ pytest = ">=5.0.0"
|
||||
name = "pytest-xdist"
|
||||
version = "3.3.1"
|
||||
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -1899,6 +1973,7 @@ testing = ["filelock"]
|
||||
name = "python-dateutil"
|
||||
version = "2.8.2"
|
||||
description = "Extensions to the standard Python datetime module"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
|
||||
files = [
|
||||
@@ -1913,6 +1988,7 @@ six = ">=1.5"
|
||||
name = "python-jose"
|
||||
version = "3.3.0"
|
||||
description = "JOSE implementation in Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1935,6 +2011,7 @@ pycryptodome = ["pyasn1", "pycryptodome (>=3.3.1,<4.0.0)"]
|
||||
name = "pywin32"
|
||||
version = "301"
|
||||
description = "Python for Window Extensions"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -1954,6 +2031,7 @@ files = [
|
||||
name = "pyyaml"
|
||||
version = "6.0"
|
||||
description = "YAML parser and emitter for Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
@@ -2003,6 +2081,7 @@ files = [
|
||||
name = "requests"
|
||||
version = "2.31.0"
|
||||
description = "Python HTTP for Humans."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2024,6 +2103,7 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
name = "responses"
|
||||
version = "0.21.0"
|
||||
description = "A utility library for mocking out the `requests` Python library."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2042,6 +2122,7 @@ tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asy
|
||||
name = "rsa"
|
||||
version = "4.9"
|
||||
description = "Pure-Python RSA implementation"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6,<4"
|
||||
files = [
|
||||
@@ -2056,6 +2137,7 @@ pyasn1 = ">=0.1.3"
|
||||
name = "ruff"
|
||||
version = "0.0.269"
|
||||
description = "An extremely fast Python linter, written in Rust."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2082,6 +2164,7 @@ files = [
|
||||
name = "s3transfer"
|
||||
version = "0.6.0"
|
||||
description = "An Amazon S3 Transfer Manager"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">= 3.7"
|
||||
files = [
|
||||
@@ -2099,6 +2182,7 @@ crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
|
||||
name = "sarif-om"
|
||||
version = "1.0.4"
|
||||
description = "Classes implementing the SARIF 2.1.0 object model."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">= 2.7"
|
||||
files = [
|
||||
@@ -2114,6 +2198,7 @@ pbr = "*"
|
||||
name = "setuptools"
|
||||
version = "65.5.1"
|
||||
description = "Easily download, build, install, upgrade, and uninstall Python packages"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2130,6 +2215,7 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (
|
||||
name = "six"
|
||||
version = "1.16.0"
|
||||
description = "Python 2 and 3 compatibility utilities"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
|
||||
files = [
|
||||
@@ -2141,6 +2227,7 @@ files = [
|
||||
name = "sshpubkeys"
|
||||
version = "3.3.1"
|
||||
description = "SSH public key parser"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3"
|
||||
files = [
|
||||
@@ -2159,6 +2246,7 @@ dev = ["twine", "wheel", "yapf"]
|
||||
name = "toml"
|
||||
version = "0.10.2"
|
||||
description = "Python Library for Tom's Obvious, Minimal Language"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
|
||||
files = [
|
||||
@@ -2170,6 +2258,7 @@ files = [
|
||||
name = "tomli"
|
||||
version = "2.0.1"
|
||||
description = "A lil' TOML parser"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2181,6 +2270,7 @@ files = [
|
||||
name = "types-psutil"
|
||||
version = "5.9.5.12"
|
||||
description = "Typing stubs for psutil"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2192,6 +2282,7 @@ files = [
|
||||
name = "types-psycopg2"
|
||||
version = "2.9.21.10"
|
||||
description = "Typing stubs for psycopg2"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2203,6 +2294,7 @@ files = [
|
||||
name = "types-pytest-lazy-fixture"
|
||||
version = "0.6.3.3"
|
||||
description = "Typing stubs for pytest-lazy-fixture"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2214,6 +2306,7 @@ files = [
|
||||
name = "types-requests"
|
||||
version = "2.31.0.0"
|
||||
description = "Typing stubs for requests"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2228,6 +2321,7 @@ types-urllib3 = "*"
|
||||
name = "types-s3transfer"
|
||||
version = "0.6.0.post3"
|
||||
description = "Type annotations and code completion for s3transfer"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7,<4.0"
|
||||
files = [
|
||||
@@ -2239,6 +2333,7 @@ files = [
|
||||
name = "types-toml"
|
||||
version = "0.10.8.6"
|
||||
description = "Typing stubs for toml"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2250,6 +2345,7 @@ files = [
|
||||
name = "types-urllib3"
|
||||
version = "1.26.17"
|
||||
description = "Typing stubs for urllib3"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
@@ -2261,6 +2357,7 @@ files = [
|
||||
name = "typing-extensions"
|
||||
version = "4.6.1"
|
||||
description = "Backported and Experimental Type Hints for Python 3.7+"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2272,6 +2369,7 @@ files = [
|
||||
name = "urllib3"
|
||||
version = "1.26.11"
|
||||
description = "HTTP library with thread-safe connection pooling, file post, and more."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
|
||||
files = [
|
||||
@@ -2288,6 +2386,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
name = "websocket-client"
|
||||
version = "1.3.3"
|
||||
description = "WebSocket client for Python with low level API options"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2304,6 +2403,7 @@ test = ["websockets"]
|
||||
name = "werkzeug"
|
||||
version = "2.2.3"
|
||||
description = "The comprehensive WSGI web application library."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2321,6 +2421,7 @@ watchdog = ["watchdog"]
|
||||
name = "wrapt"
|
||||
version = "1.14.1"
|
||||
description = "Module for decorators, wrappers and monkey patching."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
|
||||
files = [
|
||||
@@ -2394,6 +2495,7 @@ files = [
|
||||
name = "xmltodict"
|
||||
version = "0.13.0"
|
||||
description = "Makes working with XML feel like you are working with JSON"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.4"
|
||||
files = [
|
||||
@@ -2405,6 +2507,7 @@ files = [
|
||||
name = "yarl"
|
||||
version = "1.8.2"
|
||||
description = "Yet another URL library"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2492,6 +2595,7 @@ multidict = ">=4.0"
|
||||
name = "zipp"
|
||||
version = "3.8.1"
|
||||
description = "Backport of pathlib-compatible object wrapper for zip files"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
@@ -2506,4 +2610,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "305be93b4987509365a86e3726e22632655a3d205e47c47bcb46f8fcb7a0d284"
|
||||
content-hash = "c6c217033f50430c31b0979b74db222e6bab2301abd8b9f0cce5a9d5bccc578f"
|
||||
|
||||
@@ -26,7 +26,7 @@ prometheus-client = "^0.14.1"
|
||||
pytest-timeout = "^2.1.0"
|
||||
Werkzeug = "^2.2.3"
|
||||
pytest-order = "^1.1.0"
|
||||
allure-pytest = "2.13.1"
|
||||
allure-pytest = "^2.13.2"
|
||||
pytest-asyncio = "^0.21.0"
|
||||
toml = "^0.10.2"
|
||||
psutil = "^5.9.4"
|
||||
|
||||
@@ -98,6 +98,22 @@ pub struct PhysicalStorage {
|
||||
/// - points to write_lsn, so no seek is needed for writing
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
|
||||
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
|
||||
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
|
||||
/// there can be a case with unexpected .partial file.
|
||||
///
|
||||
/// Imagine the following:
|
||||
/// - 000000010000000000000001
|
||||
/// - it was fully written, but the last record is split between 2 segments
|
||||
/// - after restart, find_end_of_wal() returned 0/1FFFFF0, which is in the end of this segment
|
||||
/// - write_lsn, write_record_lsn and flush_record_lsn were initialized to 0/1FFFFF0
|
||||
/// - 000000010000000000000002.partial
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL record
|
||||
///
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
|
||||
/// This flag will be set to true after the first truncate_wal() call.
|
||||
is_truncated_after_restart: bool,
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
@@ -157,6 +173,7 @@ impl PhysicalStorage {
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
is_truncated_after_restart: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -381,7 +398,10 @@ impl Storage for PhysicalStorage {
|
||||
|
||||
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
|
||||
// disk (this happens on each connect).
|
||||
if end_pos == self.write_lsn {
|
||||
if self.is_truncated_after_restart
|
||||
&& end_pos == self.write_lsn
|
||||
&& end_pos == self.flush_record_lsn
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -414,6 +434,7 @@ impl Storage for PhysicalStorage {
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
self.is_truncated_after_restart = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -163,7 +163,6 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
|
||||
Reference in New Issue
Block a user