mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 06:00:38 +00:00
Compare commits
16 Commits
conrad/pro
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ef703c0fc | ||
|
|
cbe19e1cbb | ||
|
|
860dacf18c | ||
|
|
7a8229f3af | ||
|
|
7ef666c723 | ||
|
|
0bda9d3c94 | ||
|
|
9fe7b7a079 | ||
|
|
74a4cf0b2a | ||
|
|
a79835dbc0 | ||
|
|
1b2663350c | ||
|
|
e8ae409bdc | ||
|
|
46acdaed8d | ||
|
|
d00ad45ce7 | ||
|
|
388122fe27 | ||
|
|
3919dd7ef6 | ||
|
|
bd516a491b |
@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
@@ -508,11 +508,11 @@ async fn collect_eviction_candidates(
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
for tl in tenant.list_timelines().await {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -175,7 +175,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -193,7 +193,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -224,7 +224,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -283,6 +283,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -304,7 +305,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
let timelines = tenant.list_timelines();
|
||||
let timelines = tenant.list_timelines().await;
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
@@ -343,6 +344,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
@@ -502,8 +504,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -608,7 +610,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
@@ -956,6 +958,7 @@ async fn active_timeline_of_active_tenant(
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)
|
||||
}
|
||||
|
||||
|
||||
@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
tokio_tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn import_file(
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.writer();
|
||||
let writer = modification.tline.writer().await;
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
|
||||
@@ -376,7 +376,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
@@ -475,7 +475,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -1184,6 +1186,6 @@ async fn get_active_tenant_timeline(
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, GetActiveTenantError> {
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer();
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
let mut layer_map = self.tline.layers.write().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
pub async fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
@@ -1594,18 +1596,20 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant
|
||||
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)?
|
||||
.initialize(ctx)?;
|
||||
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)
|
||||
.await?
|
||||
.initialize(ctx)
|
||||
.await?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
@@ -1632,7 +1636,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -316,7 +316,7 @@ pub async fn set_new_tenant_config(
|
||||
new_tenant_conf,
|
||||
false,
|
||||
)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
tenant.set_new_tenant_config(new_tenant_conf).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -677,6 +677,7 @@ pub async fn immediate_compact(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
|
||||
@@ -1264,9 +1264,13 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
runtime.block_on(async {
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let _ = timeline.initialize(&ctx).await?;
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
let mut timelines = tenant.list_timelines().await;
|
||||
|
||||
if timelines.is_empty() {
|
||||
// perhaps the tenant has just been created, and as such doesn't have any data yet
|
||||
|
||||
@@ -22,6 +22,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tracing::warn;
|
||||
@@ -335,6 +336,13 @@ impl LayerAccessStats {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type GetValueReconstructFuture = Pin<
|
||||
Box<
|
||||
dyn Send
|
||||
+ std::future::Future<Output = Result<(ValueReconstructState, ValueReconstructResult)>>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
|
||||
/// required by [`LayerMap`].
|
||||
///
|
||||
@@ -372,12 +380,12 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
reconstruct_data: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture;
|
||||
|
||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||
fn short_id(&self) -> String;
|
||||
@@ -486,12 +494,12 @@ impl Layer for LayerDescriptor {
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
_reconstruct_data: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -56,8 +56,8 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, PathOrConf,
|
||||
DeltaFileName, GetValueReconstructFuture, Layer, LayerAccessStats, LayerAccessStatsReset,
|
||||
LayerFileName, LayerIter, LayerKeyIter, PathOrConf,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -318,89 +318,94 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.lsn_range.start);
|
||||
let mut need_image = true;
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture {
|
||||
Box::pin(async move {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
ensure!(lsn_range.start >= self.lsn_range.start);
|
||||
let mut need_image = true;
|
||||
|
||||
ensure!(self.key_range.contains(&key));
|
||||
ensure!(self.key_range.contains(&key));
|
||||
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
|
||||
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})?;
|
||||
!blob_ref.will_init()
|
||||
})?;
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// release metadata lock and close the file
|
||||
}
|
||||
}
|
||||
// release metadata lock and close the file
|
||||
}
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||
} else {
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard};
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -53,7 +53,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
use super::{GetValueReconstructFuture, Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -197,38 +197,45 @@ impl Layer for ImageLayer {
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
assert!(self.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture {
|
||||
Box::pin(async move {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
assert!(self.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.path().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.path().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
} else {
|
||||
Ok((reconstruct_state, ValueReconstructResult::Missing))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::walrecord;
|
||||
use anyhow::{ensure, Result};
|
||||
use anyhow::{ensure, Context, Result};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
@@ -27,9 +27,9 @@ use utils::{
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::{DeltaLayer, DeltaLayerWriter, Layer};
|
||||
use super::{DeltaLayer, DeltaLayerWriter, GetValueReconstructFuture, Layer};
|
||||
|
||||
thread_local! {
|
||||
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
|
||||
@@ -191,52 +191,60 @@ impl Layer for InMemoryLayer {
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture {
|
||||
Box::pin(async move {
|
||||
// The in-memory layer isn't actually in-memory. It uses EphemeralFile.
|
||||
// So, this does do IO.
|
||||
tokio::task::spawn_blocking(move || {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
let inner = self.inner.read().unwrap();
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let mut reader = inner.file.block_cursor();
|
||||
let mut reader = inner.file.block_cursor();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
let buf = reader.read_blob(*pos)?;
|
||||
let value = Value::des(&buf)?;
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
let buf = reader.read_blob(*pos)?;
|
||||
let value = Value::des(&buf)?;
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok((reconstruct_state, ValueReconstructResult::Complete));
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok((reconstruct_state, ValueReconstructResult::Continue))
|
||||
} else {
|
||||
Ok((reconstruct_state, ValueReconstructResult::Complete))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,7 +312,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
|
||||
use anyhow::{bail, Result};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use std::ops::Range;
|
||||
@@ -21,8 +21,8 @@ use utils::{
|
||||
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
use super::image_layer::ImageLayer;
|
||||
use super::{
|
||||
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
DeltaLayer, GetValueReconstructFuture, LayerAccessStats, LayerAccessStatsReset, LayerIter,
|
||||
LayerKeyIter, LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
@@ -83,16 +83,18 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
bail!(
|
||||
"layer {} needs to be downloaded",
|
||||
self.filename().file_name()
|
||||
);
|
||||
_reconstruct_state: ValueReconstructState,
|
||||
_ctx: RequestContext,
|
||||
) -> GetValueReconstructFuture {
|
||||
Box::pin(async move {
|
||||
bail!(
|
||||
"layer {} needs to be downloaded",
|
||||
self.filename().file_name()
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
|
||||
@@ -29,7 +29,7 @@ use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
|
||||
@@ -121,7 +121,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
@@ -170,7 +170,7 @@ pub struct Timeline {
|
||||
/// Locked automatically by [`TimelineWriter`] and checkpointer.
|
||||
/// Must always be acquired before the layer map/individual layer lock
|
||||
/// to avoid deadlock.
|
||||
write_lock: Mutex<()>,
|
||||
write_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Used to avoid multiple `flush_loop` tasks running
|
||||
flush_loop_state: Mutex<FlushLoopState>,
|
||||
@@ -229,6 +229,8 @@ pub struct Timeline {
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
}
|
||||
|
||||
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
/// Calculation consists of two stages:
|
||||
@@ -521,12 +523,13 @@ impl Timeline {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut reconstruct_state = ValueReconstructState {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
let reconstruct_state = self
|
||||
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
|
||||
.await?;
|
||||
|
||||
self.metrics
|
||||
@@ -563,8 +566,8 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -645,7 +648,7 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false);
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
@@ -825,10 +828,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
pub async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -862,9 +865,9 @@ impl Timeline {
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
@@ -886,7 +889,7 @@ impl Timeline {
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.freeze_inmem_layer(true).await;
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
@@ -968,8 +971,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -991,7 +994,7 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
@@ -1004,7 +1007,7 @@ impl Timeline {
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1089,7 +1092,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
@@ -1353,7 +1356,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver,
|
||||
@@ -1387,7 +1390,7 @@ impl Timeline {
|
||||
layer_flush_start_tx,
|
||||
layer_flush_done_tx,
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
write_lock: tokio::sync::Mutex::new(()),
|
||||
layer_removal_cs: Default::default(),
|
||||
|
||||
gc_info: std::sync::RwLock::new(GcInfo {
|
||||
@@ -1493,8 +1496,8 @@ impl Timeline {
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -1623,7 +1626,7 @@ impl Timeline {
|
||||
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
@@ -1776,7 +1779,7 @@ impl Timeline {
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@@ -2128,8 +2131,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().await.iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(historic_layer);
|
||||
@@ -2212,13 +2215,31 @@ impl Timeline {
|
||||
///
|
||||
/// This function takes the current timeline's locked LayerMap as an argument,
|
||||
/// so callers can avoid potential race conditions.
|
||||
///
|
||||
// TODO: find a way to not hold the Timeline::layers lock during get_value_reconstruct_data calls.
|
||||
//
|
||||
// Since these calls do local disk IO, they'll be reasonably fast, until come disk IOPS bound.
|
||||
// We have lots of headroom on current pageservers, so, it's going to be fine for now.
|
||||
//
|
||||
// We can't use tokio::sync::RwLock that easily because its guard is not Send, but,
|
||||
// many tasks that access Timeline::layers run inside task_mgr tasks, which are required
|
||||
// to be Send. It has been tried in origin/problame/asyncify-get-reconstruct-data--tokio-sync.
|
||||
//
|
||||
// The solution will probably be to have an immutable + multi-versioned layer map, allowing
|
||||
// us to grab a snapshot of the layer map once and execute this function on the snapshot.
|
||||
//
|
||||
// Or, we could invest time to figure out whether we can drop the layer map lock after
|
||||
// we grabbed the layer, do the IO, re-aquire, and continue the traversal.
|
||||
//
|
||||
// (Why is this allow() not inside the function? Because clippy doesn't respect it then).
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
async fn get_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
@@ -2245,12 +2266,12 @@ impl Timeline {
|
||||
// The function should have updated 'state'
|
||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||
match result {
|
||||
ValueReconstructResult::Complete => return Ok(()),
|
||||
ValueReconstructResult::Complete => return Ok(reconstruct_state),
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
|
||||
return Ok(());
|
||||
return Ok(reconstruct_state);
|
||||
}
|
||||
if prev_lsn <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
@@ -2336,7 +2357,7 @@ impl Timeline {
|
||||
#[allow(clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
let layers = timeline.layers.read().await;
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2347,13 +2368,19 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(open_layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
@@ -2373,13 +2400,19 @@ impl Timeline {
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(frozen_layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
@@ -2407,13 +2440,19 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
result = match Arc::clone(&layer)
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx.attached_child(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((new_reconstruct_state, result)) => {
|
||||
reconstruct_state = new_reconstruct_state;
|
||||
result
|
||||
}
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
@@ -2515,9 +2554,16 @@ impl Timeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().await;
|
||||
self.get_layer_for_write_locked(lsn, &mut layers)
|
||||
}
|
||||
|
||||
fn get_layer_for_write_locked(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
layers: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -2560,16 +2606,29 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
fn put_value_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_tombstone(key_range, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2581,15 +2640,15 @@ impl Timeline {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
let _write_guard = if write_lock_held {
|
||||
None
|
||||
} else {
|
||||
Some(self.write_lock.lock().unwrap())
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -2627,7 +2686,7 @@ impl Timeline {
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
@@ -2719,7 +2778,7 @@ impl Timeline {
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
|
||||
HashMap::from([(delta_path, metadata)])
|
||||
};
|
||||
|
||||
@@ -2728,7 +2787,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2822,7 +2881,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(
|
||||
async fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
@@ -2846,7 +2905,7 @@ impl Timeline {
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
@@ -2898,10 +2957,14 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_deltas = 0;
|
||||
|
||||
@@ -2972,7 +3035,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn)? {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3044,7 +3107,7 @@ impl Timeline {
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for l in image_layers {
|
||||
@@ -3111,9 +3174,8 @@ impl Timeline {
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
@@ -3234,7 +3296,6 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
@@ -3471,7 +3532,7 @@ impl Timeline {
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
@@ -3730,7 +3791,7 @@ impl Timeline {
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -4013,7 +4074,7 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut layers = self_clone.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
@@ -4171,7 +4232,7 @@ impl Timeline {
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
@@ -4273,8 +4334,8 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
@@ -4346,7 +4407,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
// but will cause large code changes.
|
||||
pub struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: MutexGuard<'a, ()>,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
|
||||
impl Deref for TimelineWriter<'_> {
|
||||
@@ -4362,12 +4423,23 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value).await
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
pub fn put_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
self.tl
|
||||
.put_value_locked(key, lsn, value, pre_locked_layer_map)
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn).await
|
||||
}
|
||||
|
||||
/// Track the end of the latest digested WAL record.
|
||||
|
||||
@@ -185,7 +185,7 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
|
||||
@@ -1310,8 +1310,9 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
let timeline = timeline.initialize(&ctx).unwrap();
|
||||
let timeline = timeline.initialize(&ctx).await.unwrap();
|
||||
|
||||
ConnectionManagerState {
|
||||
id: TenantTimelineId {
|
||||
|
||||
@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline.check_checkpoint_distance().with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
|
||||
@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1200,7 +1200,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1217,22 +1217,22 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1318,7 +1318,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1360,7 +1360,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
@@ -1373,7 +1373,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
@@ -1398,7 +1398,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
@@ -1428,14 +1428,14 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1454,7 +1454,7 @@ mod tests {
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
@@ -1472,7 +1472,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1497,7 +1497,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1509,7 +1509,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1554,7 +1554,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1603,7 +1603,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1637,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -1648,7 +1648,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1664,7 +1664,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
@@ -1677,7 +1677,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
@@ -1693,7 +1693,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
|
||||
Reference in New Issue
Block a user