mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
12 Commits
undo_unlog
...
vlad/write
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62cefb38b3 | ||
|
|
3c3cb8b0af | ||
|
|
597375c874 | ||
|
|
8b042622cf | ||
|
|
43bb10a58d | ||
|
|
ccdf185b8e | ||
|
|
b27368bfec | ||
|
|
4a84446b78 | ||
|
|
0008ee81a8 | ||
|
|
5b30b9fe11 | ||
|
|
74745c4e7a | ||
|
|
5a5fa2158b |
@@ -24,7 +24,6 @@ use tracing::*;
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::tenant::Timeline;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
@@ -261,7 +260,7 @@ where
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.get_slru_keyspace(self.lsn, self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
|
||||
@@ -288,7 +287,7 @@ where
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
@@ -363,7 +362,7 @@ where
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_size(src, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
@@ -384,7 +383,7 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
@@ -415,7 +414,7 @@ where
|
||||
let relmap_img = if has_relmap_file {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.get_relmap_file(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
@@ -462,7 +461,7 @@ where
|
||||
if !has_relmap_file
|
||||
&& self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
|
||||
@@ -81,8 +81,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
|
||||
pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "tokio-epoll-uring";
|
||||
|
||||
@@ -109,7 +107,6 @@ pub mod defaults {
|
||||
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
||||
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
||||
|
||||
#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
|
||||
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
@@ -130,7 +127,6 @@ pub mod defaults {
|
||||
|
||||
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
|
||||
|
||||
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
|
||||
|
||||
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
|
||||
|
||||
@@ -273,9 +269,6 @@ pub struct PageServerConf {
|
||||
/// deprioritises secondary downloads vs. remote storage operations for attached tenants.
|
||||
pub secondary_download_concurrency: usize,
|
||||
|
||||
/// Maximum number of WAL records to be ingested and committed at the same time
|
||||
pub ingest_batch_size: u64,
|
||||
|
||||
pub virtual_file_io_engine: virtual_file::IoEngineKind,
|
||||
|
||||
pub get_vectored_impl: GetVectoredImpl,
|
||||
@@ -400,9 +393,8 @@ struct PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: BuilderValue<bool>,
|
||||
|
||||
heatmap_upload_concurrency: BuilderValue<usize>,
|
||||
secondary_download_concurrency: BuilderValue<usize>,
|
||||
|
||||
ingest_batch_size: BuilderValue<u64>,
|
||||
secondary_download_concurrency: BuilderValue<usize>,
|
||||
|
||||
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
|
||||
|
||||
@@ -490,8 +482,6 @@ impl PageServerConfigBuilder {
|
||||
heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
|
||||
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
|
||||
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
|
||||
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
|
||||
@@ -659,10 +649,6 @@ impl PageServerConfigBuilder {
|
||||
self.secondary_download_concurrency = BuilderValue::Set(value)
|
||||
}
|
||||
|
||||
pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
|
||||
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
|
||||
}
|
||||
|
||||
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
|
||||
self.virtual_file_io_engine = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -734,7 +720,6 @@ impl PageServerConfigBuilder {
|
||||
control_plane_emergency_mode,
|
||||
heatmap_upload_concurrency,
|
||||
secondary_download_concurrency,
|
||||
ingest_batch_size,
|
||||
get_vectored_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
@@ -1013,7 +998,6 @@ impl PageServerConf {
|
||||
"secondary_download_concurrency" => {
|
||||
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
|
||||
"virtual_file_io_engine" => {
|
||||
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
|
||||
}
|
||||
@@ -1105,7 +1089,6 @@ impl PageServerConf {
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
@@ -1343,7 +1326,6 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
@@ -1415,7 +1397,6 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_bytes: MaxVectoredReadBytes(
|
||||
@@ -1424,6 +1405,7 @@ background_task_maximum_delay = '334 s'
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -14,7 +14,6 @@ use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
@@ -305,16 +304,13 @@ async fn import_wal(
|
||||
waldecoder.feed_bytes(&buf);
|
||||
|
||||
let mut nrecords = 0;
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.await?;
|
||||
WAL_INGEST.records_committed.inc();
|
||||
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
nrecords += 1;
|
||||
@@ -444,14 +440,13 @@ pub async fn import_wal_from_tar(
|
||||
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
let mut modification = tline.begin_modification(end_lsn);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.await?;
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
debug!("imported record at {} (end {})", lsn, end_lsn);
|
||||
|
||||
@@ -61,7 +61,6 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::task_mgr;
|
||||
@@ -931,7 +930,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_exists(req.rel, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
@@ -958,9 +957,7 @@ impl PageServerHandler {
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.await?;
|
||||
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
n_blocks,
|
||||
@@ -987,13 +984,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
.get_db_size(
|
||||
DEFAULTTABLESPACE_OID,
|
||||
req.dbnode,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -1165,7 +1156,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
|
||||
@@ -156,7 +156,6 @@ impl Timeline {
|
||||
{
|
||||
DatadirModification {
|
||||
tline: self,
|
||||
pending_lsns: Vec::new(),
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
@@ -174,7 +173,7 @@ impl Timeline {
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
@@ -184,20 +183,17 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
|
||||
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
tag, blknum, lsn, nblocks
|
||||
);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
version.get(self, key, ctx).await
|
||||
self.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
@@ -205,16 +201,16 @@ impl Timeline {
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<usize, PageReconstructError> {
|
||||
let mut total_blocks = 0;
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
|
||||
let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -224,7 +220,7 @@ impl Timeline {
|
||||
pub(crate) async fn get_rel_size(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
@@ -234,12 +230,12 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
return Ok(nblocks);
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, version, latest, ctx).await?
|
||||
&& !self.get_rel_exists(tag, lsn, latest, ctx).await?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -249,7 +245,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let key = rel_size_to_key(tag);
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let mut buf = self.get(key, lsn, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
if latest {
|
||||
@@ -260,7 +256,7 @@ impl Timeline {
|
||||
// latest=true, then it can not cause cache corruption, because with latest=true
|
||||
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
|
||||
// associated with most recent value of LSN.
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
self.update_cached_rel_size(tag, lsn, nblocks);
|
||||
}
|
||||
Ok(nblocks)
|
||||
}
|
||||
@@ -269,7 +265,7 @@ impl Timeline {
|
||||
pub(crate) async fn get_rel_exists(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
_latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
@@ -280,12 +276,12 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// first try to lookup relation in cache
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
return Ok(true);
|
||||
}
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match RelDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -305,12 +301,12 @@ impl Timeline {
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match RelDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -337,7 +333,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.get_slru_segment_size(kind, segno, lsn, ctx)
|
||||
.await?;
|
||||
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for blkno in 0..n_blocks {
|
||||
@@ -367,11 +363,11 @@ impl Timeline {
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let mut buf = self.get(key, lsn, ctx).await?;
|
||||
Ok(buf.get_u32_le())
|
||||
}
|
||||
|
||||
@@ -380,12 +376,12 @@ impl Timeline {
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
// fetch directory listing
|
||||
let key = slru_dir_to_key(kind);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -536,11 +532,11 @@ impl Timeline {
|
||||
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
|
||||
) -> Result<T, PageReconstructError> {
|
||||
for segno in self
|
||||
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
|
||||
.list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
|
||||
.await?
|
||||
{
|
||||
let nblocks = self
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
|
||||
.await?;
|
||||
for blknum in (0..nblocks).rev() {
|
||||
let clog_page = self
|
||||
@@ -564,21 +560,21 @@ impl Timeline {
|
||||
|
||||
pub(crate) async fn get_slru_keyspace(
|
||||
&self,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<KeySpace, PageReconstructError> {
|
||||
let mut accum = KeySpaceAccum::new();
|
||||
|
||||
for kind in SlruKind::iter() {
|
||||
let mut segments: Vec<u32> = self
|
||||
.list_slru_segments(kind, version, ctx)
|
||||
.list_slru_segments(kind, lsn, ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
segments.sort_unstable();
|
||||
|
||||
for seg in segments {
|
||||
let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
|
||||
let block_count = self.get_slru_segment_size(kind, seg, lsn, ctx).await?;
|
||||
|
||||
accum.add_range(
|
||||
slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
|
||||
@@ -593,13 +589,13 @@ impl Timeline {
|
||||
pub(crate) async fn list_slru_segments(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<u32>, PageReconstructError> {
|
||||
// fetch directory entry
|
||||
let key = slru_dir_to_key(kind);
|
||||
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => Ok(dir.segments),
|
||||
Err(e) => Err(PageReconstructError::from(e)),
|
||||
@@ -610,12 +606,12 @@ impl Timeline {
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let key = relmap_file_key(spcnode, dbnode);
|
||||
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
@@ -714,10 +710,7 @@ impl Timeline {
|
||||
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
@@ -757,7 +750,7 @@ impl Timeline {
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
|
||||
.list_rels(spcnode, dbnode, lsn, ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
@@ -864,15 +857,14 @@ pub struct DatadirModification<'a> {
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
/// Current LSN of the modification
|
||||
lsn: Lsn,
|
||||
/// Lsn assigned by begin_modification
|
||||
pub lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
// underlying key-value store by the 'finish' function.
|
||||
pending_lsns: Vec<Lsn>,
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_updates: HashMap<Key, Value>,
|
||||
pending_deletions: Vec<Range<Key>>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
@@ -881,26 +873,6 @@ pub struct DatadirModification<'a> {
|
||||
}
|
||||
|
||||
impl<'a> DatadirModification<'a> {
|
||||
/// Get the current lsn
|
||||
pub(crate) fn get_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
lsn >= self.lsn,
|
||||
"setting an older lsn {} than {} is not allowed",
|
||||
lsn,
|
||||
self.lsn
|
||||
);
|
||||
if lsn > self.lsn {
|
||||
self.pending_lsns.push(self.lsn);
|
||||
self.lsn = lsn;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize a completely new repository.
|
||||
///
|
||||
/// This inserts the directory metadata entries that are assumed to
|
||||
@@ -1086,9 +1058,11 @@ impl<'a> DatadirModification<'a> {
|
||||
dbnode: Oid,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let req_lsn = self.tline.get_last_record_lsn();
|
||||
|
||||
let total_blocks = self
|
||||
.tline
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
|
||||
.get_db_size(spcnode, dbnode, req_lsn, true, ctx)
|
||||
.await?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
@@ -1185,11 +1159,8 @@ impl<'a> DatadirModification<'a> {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
if self
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(self), true, ctx)
|
||||
.await?
|
||||
{
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
|
||||
let size_key = rel_size_to_key(rel);
|
||||
// Fetch the old size first
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
@@ -1503,23 +1474,17 @@ impl<'a> DatadirModification<'a> {
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
|
||||
for (key, values) in self.pending_updates.drain() {
|
||||
for (lsn, value) in values {
|
||||
if is_rel_block_key(&key) || is_slru_block_key(key) {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, lsn, &value, ctx).await?;
|
||||
} else {
|
||||
retained_pending_updates
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push((lsn, value));
|
||||
}
|
||||
let mut retained_pending_updates = HashMap::new();
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
if is_rel_block_key(&key) || is_slru_block_key(key) {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, self.lsn, &value, ctx).await?;
|
||||
} else {
|
||||
retained_pending_updates.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
self.pending_updates = retained_pending_updates;
|
||||
self.pending_updates.extend(retained_pending_updates);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
@@ -1540,37 +1505,18 @@ impl<'a> DatadirModification<'a> {
|
||||
///
|
||||
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
if !self.pending_updates.is_empty() {
|
||||
// The put_batch call below expects expects the inputs to be sorted by Lsn,
|
||||
// so we do that first.
|
||||
let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
|
||||
self.pending_updates
|
||||
.drain()
|
||||
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
|
||||
.kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
|
||||
VecMapOrdering::GreaterOrEqual,
|
||||
);
|
||||
|
||||
writer.put_batch(lsn_ordered_batch, ctx).await?;
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value, ctx).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
if !self.pending_deletions.is_empty() {
|
||||
writer.delete_batch(&self.pending_deletions).await?;
|
||||
self.pending_deletions.clear();
|
||||
}
|
||||
|
||||
self.pending_lsns.push(self.lsn);
|
||||
for pending_lsn in self.pending_lsns.drain(..) {
|
||||
// Ideally, we should be able to call writer.finish_write() only once
|
||||
// with the highest LSN. However, the last_record_lsn variable in the
|
||||
// timeline keeps track of the latest LSN and the immediate previous LSN
|
||||
// so we need to record every LSN to not leave a gap between them.
|
||||
writer.finish_write(pending_lsn);
|
||||
}
|
||||
writer.finish_write(lsn);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
@@ -1583,86 +1529,44 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.pending_updates.len() + self.pending_deletions.len()
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
|
||||
}
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// Have we already updated the same key? Read the pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
// work directly with Images, and we never need to read actual
|
||||
// data pages. We could handle this if we had to, by calling
|
||||
// the walredo manager, but let's keep it simple for now.
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
)))
|
||||
};
|
||||
if let Some(value) = self.pending_updates.get(&key) {
|
||||
if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
// work directly with Images, and we never need to read actual
|
||||
// data pages. We could handle this if we had to, by calling
|
||||
// the walredo manager, but let's keep it simple for now.
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
if let Some((last_lsn, last_value)) = values.last_mut() {
|
||||
if *last_lsn == self.lsn {
|
||||
*last_value = val;
|
||||
return;
|
||||
}
|
||||
}
|
||||
values.push((self.lsn, val));
|
||||
self.pending_updates.insert(key, val);
|
||||
}
|
||||
|
||||
fn delete(&mut self, key_range: Range<Key>) {
|
||||
trace!("DELETE {}-{}", key_range.start, key_range.end);
|
||||
self.pending_deletions.push((key_range, self.lsn));
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct facilitates accessing either a committed key from the timeline at a
|
||||
/// specific LSN, or the latest uncommitted key from a pending modification.
|
||||
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
|
||||
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
|
||||
/// need to look up the keys in the modification first before looking them up in the
|
||||
/// timeline to not miss the latest updates.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Version<'a> {
|
||||
Lsn(Lsn),
|
||||
Modified(&'a DatadirModification<'a>),
|
||||
}
|
||||
|
||||
impl<'a> Version<'a> {
|
||||
async fn get(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
match self {
|
||||
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
|
||||
Version::Modified(modification) => modification.get(key, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
Version::Modified(modification) => modification.lsn,
|
||||
}
|
||||
self.pending_deletions.push(key_range);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3312,7 +3312,7 @@ impl Tenant {
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
timeline_struct.init_empty_layer_map(start_lsn);
|
||||
timeline_struct.init_empty_timeline(start_lsn);
|
||||
|
||||
if let Err(e) = self
|
||||
.create_timeline_files(&create_guard.timeline_path)
|
||||
|
||||
@@ -60,8 +60,6 @@ pub mod defaults {
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
||||
@@ -74,7 +74,6 @@ pub struct LayerMap {
|
||||
// where the start LSN of the next InMemoryLayer that is to be created.
|
||||
//
|
||||
pub open_layer: Option<Arc<InMemoryLayer>>,
|
||||
pub next_open_layer_at: Option<Lsn>,
|
||||
|
||||
///
|
||||
/// Frozen layers, if any. Frozen layers are in-memory layers that
|
||||
@@ -552,12 +551,6 @@ impl LayerMap {
|
||||
where
|
||||
Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
|
||||
{
|
||||
if let Some(open) = &self.open_layer {
|
||||
if pred(open) {
|
||||
return Some(open.clone());
|
||||
}
|
||||
}
|
||||
|
||||
self.frozen_layers.iter().rfind(|l| pred(l)).cloned()
|
||||
}
|
||||
|
||||
@@ -853,11 +846,6 @@ impl LayerMap {
|
||||
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!("Begin dump LayerMap");
|
||||
|
||||
println!("open_layer:");
|
||||
if let Some(open_layer) = &self.open_layer {
|
||||
open_layer.dump(verbose, ctx).await?;
|
||||
}
|
||||
|
||||
println!("frozen_layers:");
|
||||
for frozen_layer in self.frozen_layers.iter() {
|
||||
frozen_layer.dump(verbose, ctx).await?;
|
||||
|
||||
@@ -11,14 +11,16 @@ use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::ValueReconstructResult;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::{AttachedTenantConf, PageReconstructError, Timeline};
|
||||
use crate::{page_cache, walrecord};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
@@ -29,7 +31,7 @@ use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
|
||||
@@ -40,6 +42,7 @@ use super::{
|
||||
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -49,6 +52,8 @@ pub struct InMemoryLayer {
|
||||
/// start is inclusive.
|
||||
start_lsn: Lsn,
|
||||
|
||||
opened_at: Instant,
|
||||
|
||||
/// Frozen layers have an exclusive end LSN.
|
||||
/// Writes are only allowed when this is `None`.
|
||||
end_lsn: OnceLock<Lsn>,
|
||||
@@ -230,6 +235,10 @@ impl InMemoryLayer {
|
||||
assert!(self.end_lsn.get().is_none());
|
||||
}
|
||||
|
||||
pub(crate) fn assert_frozen(&self) {
|
||||
assert!(self.end_lsn.get().is_some());
|
||||
}
|
||||
|
||||
pub(crate) fn end_lsn_or_max(&self) -> Lsn {
|
||||
self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
|
||||
}
|
||||
@@ -444,6 +453,7 @@ impl InMemoryLayer {
|
||||
/// Create a new, empty, in-memory layer
|
||||
pub async fn create(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_lsn: Lsn,
|
||||
@@ -456,9 +466,11 @@ impl InMemoryLayer {
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
conf,
|
||||
tenant_conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_lsn,
|
||||
opened_at: Instant::now(),
|
||||
end_lsn: OnceLock::new(),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: HashMap::new(),
|
||||
@@ -472,7 +484,6 @@ impl InMemoryLayer {
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
|
||||
pub(crate) async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -494,6 +505,8 @@ impl InMemoryLayer {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
let inner: &mut _ = &mut *self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let off = {
|
||||
locked_inner
|
||||
@@ -507,7 +520,7 @@ impl InMemoryLayer {
|
||||
.await?
|
||||
};
|
||||
|
||||
let vec_map = locked_inner.index.entry(key).or_default();
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
@@ -526,11 +539,13 @@ impl InMemoryLayer {
|
||||
inner.resource_units.publish_size(size)
|
||||
}
|
||||
|
||||
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
|
||||
pub(crate) async fn put_tombstones(&self, _ranges: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Make the layer non-writeable. Only call once.
|
||||
/// Records the end_lsn for non-dropped layers.
|
||||
/// `end_lsn` is exclusive
|
||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||
@@ -614,4 +629,74 @@ impl InMemoryLayer {
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
pub(crate) async fn should_roll(&self, write_size: u64, projected_lsn: Lsn) -> bool {
|
||||
let inner = self.inner.read().await;
|
||||
self.should_roll_locked(inner, write_size, projected_lsn)
|
||||
}
|
||||
|
||||
// TODO: maybe remove this?
|
||||
pub(crate) async fn try_should_roll(
|
||||
&self,
|
||||
write_size: u64,
|
||||
projected_lsn: Lsn,
|
||||
) -> Option<bool> {
|
||||
let inner = self.inner.try_read().ok()?;
|
||||
Some(self.should_roll_locked(inner, write_size, projected_lsn))
|
||||
}
|
||||
|
||||
fn should_roll_locked(
|
||||
&self,
|
||||
inner: RwLockReadGuard<'_, InMemoryLayerInner>,
|
||||
write_size: u64,
|
||||
projected_lsn: Lsn,
|
||||
) -> bool {
|
||||
let arc_guard = self.tenant_conf.load();
|
||||
// TODO: This is quick and dirty. Should only merge the configs we need.
|
||||
let tenant_conf = arc_guard
|
||||
.tenant_conf
|
||||
.merge(self.conf.default_tenant_conf.clone());
|
||||
|
||||
let layer_size = inner.file.len();
|
||||
let projected_layer_size = layer_size + write_size;
|
||||
let distance = projected_lsn.widening_sub(self.start_lsn);
|
||||
|
||||
// Rolling the open layer can be triggered by:
|
||||
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
|
||||
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
|
||||
// account for how writes are distributed across shards: we expect each node to consume
|
||||
// 1/count of the LSN on average.
|
||||
// 2. The size of the currently open layer.
|
||||
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
|
||||
// up and suspend activity.
|
||||
if distance
|
||||
>= tenant_conf.checkpoint_distance as i128
|
||||
* self.tenant_shard_id.shard_count.count() as i128
|
||||
{
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to LSN distance ({})",
|
||||
projected_lsn, layer_size, distance
|
||||
);
|
||||
|
||||
true
|
||||
} else if projected_layer_size >= tenant_conf.checkpoint_distance {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to layer size ({})",
|
||||
projected_lsn, layer_size, projected_layer_size
|
||||
);
|
||||
|
||||
true
|
||||
} else if distance > 0 && self.opened_at.elapsed() >= tenant_conf.checkpoint_timeout {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
|
||||
projected_lsn,
|
||||
layer_size,
|
||||
self.opened_at.elapsed()
|
||||
);
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use enumset::EnumSet;
|
||||
@@ -223,6 +223,8 @@ pub struct Timeline {
|
||||
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
|
||||
|
||||
open_layer: ArcSwapOption<InMemoryLayer>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
@@ -1169,7 +1171,8 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.freeze_inmem_layer(false, self.get_last_record_lsn())
|
||||
.await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
@@ -1184,13 +1187,8 @@ impl Timeline {
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(layers_guard) = self.layers.try_read() else {
|
||||
// Don't block if the layer lock is busy
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(open_layer) = &layers_guard.layer_map().open_layer else {
|
||||
// No open layer, no work to do.
|
||||
let open_layer = self.open_layer.load();
|
||||
let Some(open_layer) = &*open_layer else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -1214,17 +1212,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let checkpoint_distance =
|
||||
checkpoint_distance_override.unwrap_or(self.get_checkpoint_distance());
|
||||
|
||||
if self.should_roll(
|
||||
current_size,
|
||||
current_size,
|
||||
checkpoint_distance,
|
||||
self.get_last_record_lsn(),
|
||||
self.last_freeze_at.load(),
|
||||
*self.last_freeze_ts.read().unwrap(),
|
||||
) {
|
||||
if open_layer.should_roll(0, current_lsn).await {
|
||||
match open_layer.info() {
|
||||
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
|
||||
// We may reach this point if the layer was already frozen by not yet flushed: flushing
|
||||
@@ -1233,14 +1221,7 @@ impl Timeline {
|
||||
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
|
||||
);
|
||||
}
|
||||
InMemoryLayerInfo::Open { .. } => {
|
||||
// Upgrade to a write lock and freeze the layer
|
||||
drop(layers_guard);
|
||||
let mut layers_guard = self.layers.write().await;
|
||||
layers_guard
|
||||
.try_freeze_in_memory_layer(current_lsn, &self.last_freeze_at)
|
||||
.await;
|
||||
}
|
||||
InMemoryLayerInfo::Open { .. } => self.freeze_inmem_layer(true, current_lsn).await,
|
||||
}
|
||||
self.flush_frozen_layers();
|
||||
}
|
||||
@@ -1520,7 +1501,8 @@ impl Timeline {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
let open_layer = self.open_layer.load();
|
||||
if let Some(open_layer) = &*open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
}
|
||||
for frozen_layer in &layer_map.frozen_layers {
|
||||
@@ -1580,53 +1562,6 @@ impl Timeline {
|
||||
Err(EvictionError::Timeout) => Ok(Some(false)),
|
||||
}
|
||||
}
|
||||
|
||||
fn should_roll(
|
||||
&self,
|
||||
layer_size: u64,
|
||||
projected_layer_size: u64,
|
||||
checkpoint_distance: u64,
|
||||
projected_lsn: Lsn,
|
||||
last_freeze_at: Lsn,
|
||||
last_freeze_ts: Instant,
|
||||
) -> bool {
|
||||
let distance = projected_lsn.widening_sub(last_freeze_at);
|
||||
|
||||
// Rolling the open layer can be triggered by:
|
||||
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
|
||||
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
|
||||
// account for how writes are distributed across shards: we expect each node to consume
|
||||
// 1/count of the LSN on average.
|
||||
// 2. The size of the currently open layer.
|
||||
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
|
||||
// up and suspend activity.
|
||||
if distance >= checkpoint_distance as i128 * self.shard_identity.count.count() as i128 {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to LSN distance ({})",
|
||||
projected_lsn, layer_size, distance
|
||||
);
|
||||
|
||||
true
|
||||
} else if projected_layer_size >= checkpoint_distance {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to layer size ({})",
|
||||
projected_lsn, layer_size, projected_layer_size
|
||||
);
|
||||
|
||||
true
|
||||
} else if distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
|
||||
projected_lsn,
|
||||
layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of times we will compute partition within a checkpoint distance.
|
||||
@@ -1650,14 +1585,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -1791,6 +1718,7 @@ impl Timeline {
|
||||
shard_identity,
|
||||
pg_version,
|
||||
layers: Default::default(),
|
||||
open_layer: Default::default(),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver: Mutex::new(None),
|
||||
@@ -1988,19 +1916,14 @@ impl Timeline {
|
||||
max_lsn_wal_lag,
|
||||
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
availability_zone: self.conf.availability_zone.clone(),
|
||||
ingest_batch_size: self.conf.ingest_batch_size,
|
||||
},
|
||||
broker_client,
|
||||
ctx,
|
||||
));
|
||||
}
|
||||
|
||||
/// Initialize with an empty layer map. Used when creating a new timeline.
|
||||
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
|
||||
let mut layers = self.layers.try_write().expect(
|
||||
"in the context where we call this function, no other task has access to the object",
|
||||
);
|
||||
layers.initialize_empty(Lsn(start_lsn.0));
|
||||
pub(super) fn init_empty_timeline(&self, start_lsn: Lsn) {
|
||||
self.last_freeze_at.store(start_lsn)
|
||||
}
|
||||
|
||||
/// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
|
||||
@@ -2150,7 +2073,8 @@ impl Timeline {
|
||||
|
||||
let num_layers = loaded_layers.len();
|
||||
|
||||
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
|
||||
self.last_freeze_at.store(disk_consistent_lsn);
|
||||
guard.initialize_local_layers(loaded_layers);
|
||||
|
||||
if let Some(rtc) = self.remote_client.as_ref() {
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
|
||||
@@ -2789,7 +2713,8 @@ impl Timeline {
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer = self.open_layer.load_full();
|
||||
if let Some(open_layer) = open_layer {
|
||||
let start_lsn = open_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
|
||||
@@ -2987,10 +2912,19 @@ impl Timeline {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
// TODO: don't grab the lock just for the check
|
||||
let in_memory_layer = {
|
||||
let open_layer = timeline.open_layer.load_full();
|
||||
if let Some(open_layer) = open_layer {
|
||||
if cont_lsn >= open_layer.get_lsn_range().start {
|
||||
Some(open_layer.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
@@ -3154,17 +3088,54 @@ impl Timeline {
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let layer = guard
|
||||
.get_layer_for_write(
|
||||
lsn,
|
||||
self.get_last_record_lsn(),
|
||||
// Do we have a layer open for writing already?
|
||||
let open_layer = self.open_layer.load_full();
|
||||
if let Some(open_layer) = open_layer {
|
||||
if open_layer.get_lsn_range().start > lsn {
|
||||
bail!(
|
||||
"unexpected open layer in the future: open layers starts at {}, write lsn {}",
|
||||
open_layer.get_lsn_range().start,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
return Ok(open_layer);
|
||||
}
|
||||
|
||||
// No writeable layer yet. Validate and create one.
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
);
|
||||
|
||||
let start_lsn = Lsn(self.last_freeze_at.load().0 + 1);
|
||||
|
||||
trace!(
|
||||
"creating in-memory layer at {}/{} for record at {}",
|
||||
self.timeline_id,
|
||||
start_lsn,
|
||||
lsn
|
||||
);
|
||||
|
||||
let new_layer = Arc::new(
|
||||
InMemoryLayer::create(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
start_lsn,
|
||||
)
|
||||
.await?;
|
||||
Ok(layer)
|
||||
.await?,
|
||||
);
|
||||
|
||||
self.open_layer.store(Some(Arc::clone(&new_layer)));
|
||||
|
||||
Ok(new_layer)
|
||||
}
|
||||
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
@@ -3174,7 +3145,7 @@ impl Timeline {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool, at: Lsn) {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
|
||||
@@ -3184,14 +3155,15 @@ impl Timeline {
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
|
||||
self.freeze_inmem_layer_at(self.get_last_record_lsn()).await;
|
||||
}
|
||||
let mut layers_guard = self.layers.write().await;
|
||||
let open_layer = self.open_layer.load_full();
|
||||
if let Some(open_layer) = open_layer {
|
||||
open_layer.freeze(at).await;
|
||||
layers_guard.track_frozen_layer(open_layer);
|
||||
}
|
||||
|
||||
async fn freeze_inmem_layer_at(&self, at: Lsn) {
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.try_freeze_in_memory_layer(at, &self.last_freeze_at)
|
||||
.await;
|
||||
self.last_freeze_at.store(at);
|
||||
*self.last_freeze_ts.write().unwrap() = Instant::now();
|
||||
}
|
||||
|
||||
/// Layer flusher task's main loop.
|
||||
@@ -4609,24 +4581,15 @@ struct TimelineWriterState {
|
||||
// Largest Lsn which passed through the current writer
|
||||
max_lsn: Option<Lsn>,
|
||||
// Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
|
||||
cached_last_freeze_at: Lsn,
|
||||
cached_last_freeze_ts: Instant,
|
||||
}
|
||||
|
||||
impl TimelineWriterState {
|
||||
fn new(
|
||||
open_layer: Arc<InMemoryLayer>,
|
||||
current_size: u64,
|
||||
last_freeze_at: Lsn,
|
||||
last_freeze_ts: Instant,
|
||||
) -> Self {
|
||||
fn new(open_layer: Arc<InMemoryLayer>, current_size: u64) -> Self {
|
||||
Self {
|
||||
open_layer,
|
||||
current_size,
|
||||
prev_lsn: None,
|
||||
max_lsn: None,
|
||||
cached_last_freeze_at: last_freeze_at,
|
||||
cached_last_freeze_ts: last_freeze_ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4680,7 +4643,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
value.ser_into(&mut buf)?;
|
||||
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
|
||||
|
||||
let action = self.get_open_layer_action(lsn, buf_size);
|
||||
let action = self.get_open_layer_action(lsn, buf_size).await;
|
||||
let layer = self.handle_open_layer_action(lsn, action).await?;
|
||||
let res = layer.put_value(key, lsn, &buf, ctx).await;
|
||||
|
||||
@@ -4706,6 +4669,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
action: OpenLayerAction,
|
||||
) -> anyhow::Result<&Arc<InMemoryLayer>> {
|
||||
match action {
|
||||
// TODO: Move this into InMemoryLayer
|
||||
OpenLayerAction::Roll => {
|
||||
let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
|
||||
self.roll_layer(freeze_at).await?;
|
||||
@@ -4724,14 +4688,8 @@ impl<'a> TimelineWriter<'a> {
|
||||
let layer = self.tl.get_layer_for_write(at).await?;
|
||||
let initial_size = layer.size().await?;
|
||||
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
|
||||
self.write_guard.replace(TimelineWriterState::new(
|
||||
layer,
|
||||
initial_size,
|
||||
last_freeze_at,
|
||||
last_freeze_ts,
|
||||
));
|
||||
self.write_guard
|
||||
.replace(TimelineWriterState::new(layer, initial_size));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4739,11 +4697,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
|
||||
assert!(self.write_guard.is_some());
|
||||
|
||||
self.tl.freeze_inmem_layer_at(freeze_at).await;
|
||||
|
||||
let now = Instant::now();
|
||||
*(self.last_freeze_ts.write().unwrap()) = now;
|
||||
|
||||
self.tl.freeze_inmem_layer(true, freeze_at).await;
|
||||
self.tl.flush_frozen_layers();
|
||||
|
||||
let current_size = self.write_guard.as_ref().unwrap().current_size;
|
||||
@@ -4754,7 +4708,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
|
||||
async fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
|
||||
let state = &*self.write_guard;
|
||||
let Some(state) = &state else {
|
||||
return OpenLayerAction::Open;
|
||||
@@ -4771,14 +4725,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
return OpenLayerAction::None;
|
||||
}
|
||||
|
||||
if self.tl.should_roll(
|
||||
state.current_size,
|
||||
state.current_size + new_value_size,
|
||||
self.get_checkpoint_distance(),
|
||||
lsn,
|
||||
state.cached_last_freeze_at,
|
||||
state.cached_last_freeze_ts,
|
||||
) {
|
||||
if state.open_layer.should_roll(new_value_size, lsn).await {
|
||||
OpenLayerAction::Roll
|
||||
} else {
|
||||
OpenLayerAction::None
|
||||
@@ -4800,12 +4747,10 @@ impl<'a> TimelineWriter<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
|
||||
if let Some((_, lsn)) = batch.first() {
|
||||
let action = self.get_open_layer_action(*lsn, 0);
|
||||
let layer = self.handle_open_layer_action(*lsn, action).await?;
|
||||
layer.put_tombstones(batch).await?;
|
||||
}
|
||||
pub(crate) async fn delete(&mut self, range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let action = self.get_open_layer_action(lsn, 0).await;
|
||||
let layer = self.handle_open_layer_action(lsn, action).await?;
|
||||
layer.put_tombstones(range, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4818,11 +4763,11 @@ impl<'a> TimelineWriter<'a> {
|
||||
/// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
|
||||
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
|
||||
/// the latest and can be read.
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
pub fn finish_write(&self, new_lsn: Lsn) {
|
||||
self.tl.finish_write(new_lsn);
|
||||
}
|
||||
|
||||
pub(crate) fn update_current_logical_size(&self, delta: i64) {
|
||||
pub fn update_current_logical_size(&self, delta: i64) {
|
||||
self.tl.update_current_logical_size(delta)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,7 @@
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use anyhow::Context;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::trace;
|
||||
use utils::{
|
||||
id::TimelineId,
|
||||
lsn::{AtomicLsn, Lsn},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::TimelineMetrics,
|
||||
tenant::{
|
||||
layer_map::{BatchedUpdates, LayerMap},
|
||||
@@ -42,101 +35,18 @@ impl LayerManager {
|
||||
/// Called from `load_layer_map`. Initialize the layer manager with:
|
||||
/// 1. all on-disk layers
|
||||
/// 2. next open layer (with disk disk_consistent_lsn LSN)
|
||||
pub(crate) fn initialize_local_layers(
|
||||
&mut self,
|
||||
on_disk_layers: Vec<Layer>,
|
||||
next_open_layer_at: Lsn,
|
||||
) {
|
||||
pub(crate) fn initialize_local_layers(&mut self, on_disk_layers: Vec<Layer>) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in on_disk_layers {
|
||||
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
|
||||
}
|
||||
|
||||
/// Initialize when creating a new timeline, called in `init_empty_layer_map`.
|
||||
pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
|
||||
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
|
||||
}
|
||||
|
||||
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
|
||||
/// called within `get_layer_for_write`.
|
||||
pub(crate) async fn get_layer_for_write(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
let layer = if let Some(open_layer) = &self.layer_map.open_layer {
|
||||
if open_layer.get_lsn_range().start > lsn {
|
||||
bail!(
|
||||
"unexpected open layer in the future: open layers starts at {}, write lsn {}",
|
||||
open_layer.get_lsn_range().start,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
Arc::clone(open_layer)
|
||||
} else {
|
||||
// No writeable layer yet. Create one.
|
||||
let start_lsn = self
|
||||
.layer_map
|
||||
.next_open_layer_at
|
||||
.context("No next open layer found")?;
|
||||
|
||||
trace!(
|
||||
"creating in-memory layer at {}/{} for record at {}",
|
||||
timeline_id,
|
||||
start_lsn,
|
||||
lsn
|
||||
);
|
||||
|
||||
let new_layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
|
||||
let layer = Arc::new(new_layer);
|
||||
|
||||
self.layer_map.open_layer = Some(layer.clone());
|
||||
self.layer_map.next_open_layer_at = None;
|
||||
|
||||
layer
|
||||
};
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
/// Called from `freeze_inmem_layer`, returns true if successfully frozen.
|
||||
pub(crate) async fn try_freeze_in_memory_layer(
|
||||
&mut self,
|
||||
Lsn(last_record_lsn): Lsn,
|
||||
last_freeze_at: &AtomicLsn,
|
||||
) {
|
||||
let end_lsn = Lsn(last_record_lsn + 1);
|
||||
|
||||
if let Some(open_layer) = &self.layer_map.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
open_layer.freeze(end_lsn).await;
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
self.layer_map.frozen_layers.push_back(open_layer_rc);
|
||||
self.layer_map.open_layer = None;
|
||||
self.layer_map.next_open_layer_at = Some(end_lsn);
|
||||
last_freeze_at.store(end_lsn);
|
||||
}
|
||||
pub(crate) fn track_frozen_layer(&mut self, frozen_layer: Arc<InMemoryLayer>) {
|
||||
frozen_layer.assert_frozen();
|
||||
self.layer_map.frozen_layers.push_back(frozen_layer);
|
||||
}
|
||||
|
||||
/// Add image layers to the layer map, called from `create_image_layers`.
|
||||
|
||||
@@ -53,7 +53,6 @@ pub struct WalReceiverConf {
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub auth_token: Option<Arc<String>>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub ingest_batch_size: u64,
|
||||
}
|
||||
|
||||
pub struct WalReceiver {
|
||||
|
||||
@@ -450,7 +450,6 @@ impl ConnectionManagerState {
|
||||
|
||||
let node_id = new_sk.safekeeper_id;
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -470,7 +469,6 @@ impl ConnectionManagerState {
|
||||
connect_timeout,
|
||||
ctx,
|
||||
node_id,
|
||||
ingest_batch_size,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1407,7 +1405,6 @@ mod tests {
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
ingest_batch_size: 1,
|
||||
},
|
||||
wal_connection: None,
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
|
||||
@@ -106,7 +106,6 @@ impl From<WalDecodeError> for WalReceiverError {
|
||||
|
||||
/// Open a connection to the given safekeeper and receive WAL, sending back progress
|
||||
/// messages as we go.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn handle_walreceiver_connection(
|
||||
timeline: Arc<Timeline>,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
@@ -115,7 +114,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connect_timeout: Duration,
|
||||
ctx: RequestContext,
|
||||
node: NodeId,
|
||||
ingest_batch_size: u64,
|
||||
) -> Result<(), WalReceiverError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -318,9 +316,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
{
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
let mut modification = timeline.begin_modification(startlsn);
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
let mut modification = timeline.begin_modification(endlsn);
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
@@ -329,40 +325,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
let ingested = walingest
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {lsn}"))?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = lsn;
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
uncommitted_records += 1;
|
||||
if uncommitted_records >= ingest_batch_size {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(uncommitted_records - filtered_records);
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
filtered_records = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the remaining records.
|
||||
if uncommitted_records > 0 {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(uncommitted_records - filtered_records);
|
||||
modification.commit(&ctx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use utils::failpoint_support;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
||||
use crate::pgdatadir_mapping::DatadirModification;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord::*;
|
||||
@@ -49,18 +49,20 @@ use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub struct WalIngest {
|
||||
pub struct WalIngest<'a> {
|
||||
shard: ShardIdentity,
|
||||
timeline: &'a Timeline,
|
||||
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
impl<'a> WalIngest<'a> {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
timeline: &'a Timeline,
|
||||
startpoint: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<WalIngest> {
|
||||
ctx: &'_ RequestContext,
|
||||
) -> anyhow::Result<WalIngest<'a>> {
|
||||
// Fetch the latest checkpoint into memory, so that we can compare with it
|
||||
// quickly in `ingest_record` and update it when it changes.
|
||||
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
||||
@@ -69,6 +71,7 @@ impl WalIngest {
|
||||
|
||||
Ok(WalIngest {
|
||||
shard: *timeline.get_shard_identity(),
|
||||
timeline,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
})
|
||||
@@ -82,8 +85,6 @@ impl WalIngest {
|
||||
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
|
||||
/// relations/pages that the record affects.
|
||||
///
|
||||
/// This function returns `true` if the record was ingested, and `false` if it was filtered out
|
||||
///
|
||||
pub async fn ingest_record(
|
||||
&mut self,
|
||||
recdata: Bytes,
|
||||
@@ -91,13 +92,11 @@ impl WalIngest {
|
||||
modification: &mut DatadirModification<'_>,
|
||||
decoded: &mut DecodedWALRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST.records_received.inc();
|
||||
let pg_version = modification.tline.pg_version;
|
||||
let prev_len = modification.len();
|
||||
|
||||
modification.set_lsn(lsn)?;
|
||||
decode_wal_record(recdata, decoded, pg_version)?;
|
||||
modification.lsn = lsn;
|
||||
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
@@ -138,9 +137,9 @@ impl WalIngest {
|
||||
}
|
||||
pg_constants::RM_DBASE_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
debug!(%info, %pg_version, "handle RM_DBASE_ID");
|
||||
debug!(%info, pg_version=%self.timeline.pg_version, "handle RM_DBASE_ID");
|
||||
|
||||
if pg_version == 14 {
|
||||
if self.timeline.pg_version == 14 {
|
||||
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
|
||||
let createdb = XlCreateDatabase::decode(&mut buf);
|
||||
debug!("XLOG_DBASE_CREATE v14");
|
||||
@@ -156,7 +155,7 @@ impl WalIngest {
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
} else if pg_version == 15 {
|
||||
} else if self.timeline.pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
@@ -176,7 +175,7 @@ impl WalIngest {
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
} else if pg_version == 16 {
|
||||
} else if self.timeline.pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
@@ -423,11 +422,19 @@ impl WalIngest {
|
||||
self.checkpoint_modified = false;
|
||||
}
|
||||
|
||||
// Note that at this point this record is only cached in the modification
|
||||
// until commit() is called to flush the data into the repository and update
|
||||
// the latest LSN.
|
||||
if modification.is_empty() {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
modification.tline.finish_write(lsn);
|
||||
} else {
|
||||
WAL_INGEST.records_committed.inc();
|
||||
modification.commit(ctx).await?;
|
||||
}
|
||||
|
||||
Ok(modification.len() > prev_len)
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Do not store this block, but observe it for the purposes of updating our relation size state.
|
||||
@@ -474,7 +481,7 @@ impl WalIngest {
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
@@ -527,7 +534,7 @@ impl WalIngest {
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
match modification.tline.pg_version {
|
||||
match self.timeline.pg_version {
|
||||
14 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -751,7 +758,7 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -832,11 +839,10 @@ impl WalIngest {
|
||||
let mut new_heap_blkno: Option<u32> = None;
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
let pg_version = modification.tline.pg_version;
|
||||
|
||||
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
||||
|
||||
match pg_version {
|
||||
match self.timeline.pg_version {
|
||||
16 => {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
@@ -899,7 +905,7 @@ impl WalIngest {
|
||||
}
|
||||
_ => bail!(
|
||||
"Neon RMGR has no known compatibility with PostgreSQL version {}",
|
||||
pg_version
|
||||
self.timeline.pg_version
|
||||
),
|
||||
}
|
||||
|
||||
@@ -922,7 +928,7 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -1000,14 +1006,16 @@ impl WalIngest {
|
||||
let src_db_id = rec.src_db_id;
|
||||
let src_tablespace_id = rec.src_tablespace_id;
|
||||
|
||||
// Creating a database is implemented by copying the template (aka. source) database.
|
||||
// To copy all the relations, we need to ask for the state as of the same LSN, but we
|
||||
// cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
|
||||
// the last valid LSN to advance up to it. So we use the previous record's LSN in the
|
||||
// get calls instead.
|
||||
let req_lsn = modification.tline.get_last_record_lsn();
|
||||
|
||||
let rels = modification
|
||||
.tline
|
||||
.list_rels(
|
||||
src_tablespace_id,
|
||||
src_db_id,
|
||||
Version::Modified(modification),
|
||||
ctx,
|
||||
)
|
||||
.list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
|
||||
@@ -1015,12 +1023,7 @@ impl WalIngest {
|
||||
// Copy relfilemap
|
||||
let filemap = modification
|
||||
.tline
|
||||
.get_relmap_file(
|
||||
src_tablespace_id,
|
||||
src_db_id,
|
||||
Version::Modified(modification),
|
||||
ctx,
|
||||
)
|
||||
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
|
||||
.await?;
|
||||
modification
|
||||
.put_relmap_file(tablespace_id, db_id, filemap, ctx)
|
||||
@@ -1034,7 +1037,7 @@ impl WalIngest {
|
||||
|
||||
let nblocks = modification
|
||||
.tline
|
||||
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(src_rel, req_lsn, true, ctx)
|
||||
.await?;
|
||||
let dst_rel = RelTag {
|
||||
spcnode: tablespace_id,
|
||||
@@ -1068,13 +1071,7 @@ impl WalIngest {
|
||||
|
||||
let content = modification
|
||||
.tline
|
||||
.get_rel_page_at_lsn(
|
||||
src_rel,
|
||||
blknum,
|
||||
Version::Modified(modification),
|
||||
true,
|
||||
ctx,
|
||||
)
|
||||
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
|
||||
.await?;
|
||||
modification.put_rel_page_image(dst_rel, blknum, content)?;
|
||||
num_blocks_copied += 1;
|
||||
@@ -1145,7 +1142,7 @@ impl WalIngest {
|
||||
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
@@ -1167,7 +1164,7 @@ impl WalIngest {
|
||||
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
@@ -1240,9 +1237,10 @@ impl WalIngest {
|
||||
dbnode: xnode.dbnode,
|
||||
relnode: xnode.relnode,
|
||||
};
|
||||
let last_lsn = self.timeline.get_last_record_lsn();
|
||||
if modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, last_lsn, true, ctx)
|
||||
.await?
|
||||
{
|
||||
self.put_rel_drop(modification, rel, ctx).await?;
|
||||
@@ -1296,9 +1294,10 @@ impl WalIngest {
|
||||
// will block waiting for the last valid LSN to advance up to
|
||||
// it. So we use the previous record's LSN in the get calls
|
||||
// instead.
|
||||
let req_lsn = modification.tline.get_last_record_lsn();
|
||||
for segno in modification
|
||||
.tline
|
||||
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
|
||||
.list_slru_segments(SlruKind::Clog, req_lsn, ctx)
|
||||
.await?
|
||||
{
|
||||
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
@@ -1516,6 +1515,20 @@ impl WalIngest {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_relsize(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<BlockNumber> {
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
|
||||
0
|
||||
} else {
|
||||
self.timeline.get_rel_size(rel, lsn, true, ctx).await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
async fn handle_rel_extend(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
@@ -1527,6 +1540,7 @@ impl WalIngest {
|
||||
// Check if the relation exists. We implicitly create relations on first
|
||||
// record.
|
||||
// TODO: would be nice if to be more explicit about it
|
||||
let last_lsn = modification.lsn;
|
||||
|
||||
// Get current size and put rel creation if rel doesn't exist
|
||||
//
|
||||
@@ -1534,14 +1548,11 @@ impl WalIngest {
|
||||
// check the cache too. This is because eagerly checking the cache results in
|
||||
// less work overall and 10% better performance. It's more work on cache miss
|
||||
// but cache miss is rare.
|
||||
let old_nblocks = if let Some(nblocks) = modification
|
||||
.tline
|
||||
.get_cached_rel_size(&rel, modification.get_lsn())
|
||||
{
|
||||
let old_nblocks = if let Some(nblocks) = self.timeline.get_cached_rel_size(&rel, last_lsn) {
|
||||
nblocks
|
||||
} else if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
} else if !self
|
||||
.timeline
|
||||
.get_rel_exists(rel, last_lsn, true, ctx)
|
||||
.await?
|
||||
{
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
@@ -1551,10 +1562,7 @@ impl WalIngest {
|
||||
.context("Relation Error")?;
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.await?
|
||||
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
|
||||
};
|
||||
|
||||
if new_nblocks > old_nblocks {
|
||||
@@ -1607,9 +1615,10 @@ impl WalIngest {
|
||||
// Check if the relation exists. We implicitly create relations on first
|
||||
// record.
|
||||
// TODO: would be nice if to be more explicit about it
|
||||
let old_nblocks = if !modification
|
||||
.tline
|
||||
.get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
|
||||
let last_lsn = self.timeline.get_last_record_lsn();
|
||||
let old_nblocks = if !self
|
||||
.timeline
|
||||
.get_slru_segment_exists(kind, segno, last_lsn, ctx)
|
||||
.await?
|
||||
{
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
@@ -1618,9 +1627,8 @@ impl WalIngest {
|
||||
.await?;
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
|
||||
self.timeline
|
||||
.get_slru_segment_size(kind, segno, last_lsn, ctx)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -1643,26 +1651,6 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_relsize(
|
||||
modification: &DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<BlockNumber> {
|
||||
let nblocks = if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.await?
|
||||
{
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -1687,7 +1675,10 @@ mod tests {
|
||||
|
||||
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
|
||||
|
||||
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
|
||||
async fn init_walingest_test<'a>(
|
||||
tline: &'a Timeline,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<WalIngest<'a>> {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
@@ -1732,29 +1723,29 @@ mod tests {
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
|
||||
.await
|
||||
.is_err());
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1762,46 +1753,46 @@ mod tests {
|
||||
// Check page contents at each LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 2")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 2 at 5")
|
||||
);
|
||||
@@ -1817,19 +1808,19 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
@@ -1837,13 +1828,13 @@ mod tests {
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 2 at 5")
|
||||
);
|
||||
@@ -1856,7 +1847,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
.await?,
|
||||
0
|
||||
);
|
||||
@@ -1869,19 +1860,19 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
|
||||
.await?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1")
|
||||
);
|
||||
@@ -1894,21 +1885,21 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
1501
|
||||
);
|
||||
for blk in 2..1500 {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1500")
|
||||
);
|
||||
@@ -1935,13 +1926,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -1954,7 +1945,7 @@ mod tests {
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
@@ -1972,13 +1963,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2011,24 +2002,24 @@ mod tests {
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
|
||||
.await
|
||||
.is_err());
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2039,7 +2030,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2056,7 +2047,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2066,7 +2057,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2075,7 +2066,7 @@ mod tests {
|
||||
// should still see all blocks with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2084,7 +2075,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2104,13 +2095,13 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2120,7 +2111,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2153,9 +2144,7 @@ mod tests {
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.await?,
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE + 1
|
||||
);
|
||||
|
||||
@@ -2167,9 +2156,7 @@ mod tests {
|
||||
.await?;
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.await?,
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
);
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -2182,9 +2169,7 @@ mod tests {
|
||||
.await?;
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.await?,
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -2200,9 +2185,7 @@ mod tests {
|
||||
.await?;
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.await?,
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
);
|
||||
|
||||
@@ -2239,7 +2222,7 @@ mod tests {
|
||||
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
|
||||
let source_initdb_path = format!("{path}/{INITDB_PATH}");
|
||||
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
||||
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
|
||||
let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
@@ -2282,7 +2265,7 @@ mod tests {
|
||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(startpoint);
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
|
||||
@@ -2296,7 +2279,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a7b4c66156...3b09894ddb
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 64b8c7bccc...80cef885ad
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3946b2e2ea...9007894722
Reference in New Issue
Block a user