mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Fix LSN tracking on "unlogged index builds"
Fixes the test_gin_redo.py test failure, and probably some others
This commit is contained in:
@@ -192,6 +192,10 @@ struct RelEntry {
|
||||
/// cached size of the relation
|
||||
/// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres)
|
||||
nblocks: AtomicU32,
|
||||
|
||||
/// This is the last time the "metadata" of this relation changed, not
|
||||
/// the contents of the blocks. That is, the size of the relation.
|
||||
lw_lsn: AtomicLsn,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RelEntry {
|
||||
@@ -338,7 +342,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
CacheResult::NotFound(lsn)
|
||||
}
|
||||
|
||||
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
|
||||
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32, lsn: Lsn) {
|
||||
match self.relsize_cache.entry(RelKey::from(rel)) {
|
||||
Entry::Vacant(e) => {
|
||||
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
|
||||
@@ -346,12 +350,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
_ = e
|
||||
.insert(RelEntry {
|
||||
nblocks: AtomicU32::new(nblocks),
|
||||
lw_lsn: AtomicLsn::new(lsn.0),
|
||||
})
|
||||
.expect("out of memory");
|
||||
}
|
||||
Entry::Occupied(e) => {
|
||||
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
|
||||
e.get().nblocks.store(nblocks, Ordering::Relaxed);
|
||||
e.get().lw_lsn.store(lsn);
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -515,10 +521,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
}
|
||||
|
||||
/// Forget information about given relation in the cache. (For DROP TABLE and such)
|
||||
pub fn forget_rel(&'t self, rel: &RelTag) {
|
||||
pub fn forget_rel(&'t self, rel: &RelTag, _nblocks: Option<u32>, flush_lsn: Lsn) {
|
||||
tracing::info!("forgetting rel entry for {rel:?}");
|
||||
self.relsize_cache.remove(&RelKey::from(rel));
|
||||
|
||||
// update with flush LSN
|
||||
let _ = self
|
||||
.global_lw_lsn
|
||||
.fetch_max(flush_lsn.0, Ordering::Relaxed);
|
||||
|
||||
// also forget all cached blocks for the relation
|
||||
// FIXME
|
||||
/*
|
||||
|
||||
@@ -28,6 +28,9 @@ pub enum NeonIORequest {
|
||||
RelCreate(CRelCreateRequest),
|
||||
RelTruncate(CRelTruncateRequest),
|
||||
RelUnlink(CRelUnlinkRequest),
|
||||
|
||||
// Other requests
|
||||
ForgetCache(CForgetCacheRequest),
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
@@ -72,6 +75,7 @@ impl NeonIORequest {
|
||||
RelCreate(req) => req.request_id,
|
||||
RelTruncate(req) => req.request_id,
|
||||
RelUnlink(req) => req.request_id,
|
||||
ForgetCache(req) => req.request_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -187,7 +191,6 @@ pub struct CPrefetchVRequest {
|
||||
pub struct CDbSizeRequest {
|
||||
pub request_id: u64,
|
||||
pub db_oid: COid,
|
||||
pub request_lsn: CLsn,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
@@ -241,6 +244,7 @@ pub struct CRelCreateRequest {
|
||||
pub db_oid: COid,
|
||||
pub rel_number: u32,
|
||||
pub fork_number: u8,
|
||||
pub lsn: CLsn,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
@@ -252,6 +256,7 @@ pub struct CRelTruncateRequest {
|
||||
pub rel_number: u32,
|
||||
pub fork_number: u8,
|
||||
pub nblocks: u32,
|
||||
pub lsn: CLsn,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
@@ -262,8 +267,7 @@ pub struct CRelUnlinkRequest {
|
||||
pub db_oid: COid,
|
||||
pub rel_number: u32,
|
||||
pub fork_number: u8,
|
||||
pub block_number: u32,
|
||||
pub nblocks: u32,
|
||||
pub lsn: CLsn,
|
||||
}
|
||||
|
||||
impl CRelExistsRequest {
|
||||
@@ -375,3 +379,27 @@ impl CRelUnlinkRequest {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CForgetCacheRequest {
|
||||
pub request_id: u64,
|
||||
pub spc_oid: COid,
|
||||
pub db_oid: COid,
|
||||
pub rel_number: u32,
|
||||
pub fork_number: u8,
|
||||
pub nblocks: u32,
|
||||
pub lsn: CLsn,
|
||||
}
|
||||
|
||||
impl CForgetCacheRequest {
|
||||
pub fn reltag(&self) -> page_api::RelTag {
|
||||
page_api::RelTag {
|
||||
spcnode: self.spc_oid,
|
||||
dbnode: self.db_oid,
|
||||
relnode: self.rel_number,
|
||||
forknum: self.fork_number,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,8 +359,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
{
|
||||
Ok(nblocks) => {
|
||||
// update the cache
|
||||
tracing::info!("updated relsize for {:?} in cache: {}", rel, nblocks);
|
||||
self.cache.remember_rel_size(&rel, nblocks);
|
||||
tracing::info!("updated relsize for {:?} in cache: {}, lsn {}", rel, nblocks, read_lsn);
|
||||
self.cache.remember_rel_size(&rel, nblocks, not_modified_since);
|
||||
|
||||
NeonIOResult::RelSize(nblocks)
|
||||
}
|
||||
@@ -457,7 +457,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
|
||||
.await;
|
||||
self.cache
|
||||
.remember_rel_size(&req.reltag(), req.block_number + 1);
|
||||
.remember_rel_size(&req.reltag(), req.block_number + 1, Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
NeonIORequest::RelZeroExtend(req) => {
|
||||
@@ -466,31 +466,37 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
.inc_by(req.nblocks as u64);
|
||||
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
// TODO: I think we should put the empty pages to the cache, or at least
|
||||
// update the last-written LSN.
|
||||
// TODO: We could put the empty pages to the cache. Maybe have
|
||||
// a marker on the block entries for all-zero pages, instead of
|
||||
// actually storing the empty pages.
|
||||
self.cache
|
||||
.remember_rel_size(&req.reltag(), req.block_number + req.nblocks);
|
||||
.remember_rel_size(&req.reltag(), req.block_number + req.nblocks, Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
NeonIORequest::RelCreate(req) => {
|
||||
self.request_rel_create_counter.inc();
|
||||
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
self.cache.remember_rel_size(&req.reltag(), 0);
|
||||
self.cache.remember_rel_size(&req.reltag(), 0, Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
NeonIORequest::RelTruncate(req) => {
|
||||
self.request_rel_truncate_counter.inc();
|
||||
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
self.cache.remember_rel_size(&req.reltag(), req.nblocks);
|
||||
self.cache.remember_rel_size(&req.reltag(), req.nblocks, Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
NeonIORequest::RelUnlink(req) => {
|
||||
self.request_rel_unlink_counter.inc();
|
||||
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
self.cache.forget_rel(&req.reltag());
|
||||
self.cache.forget_rel(&req.reltag(), None, Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
NeonIORequest::ForgetCache(req) => {
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
self.cache.forget_rel(&req.reltag(), Some(req.nblocks), Lsn(req.lsn));
|
||||
NeonIOResult::WriteOK
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1061,7 +1061,7 @@ communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNum
|
||||
}
|
||||
|
||||
void
|
||||
communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn)
|
||||
{
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_RelCreate,
|
||||
@@ -1071,6 +1071,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
.db_oid = NInfoGetDbOid(rinfo),
|
||||
.rel_number = NInfoGetRelNumber(rinfo),
|
||||
.fork_number = forkNum,
|
||||
.lsn = lsn,
|
||||
}
|
||||
};
|
||||
NeonIOResult result;
|
||||
@@ -1093,7 +1094,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
}
|
||||
|
||||
void
|
||||
communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
|
||||
communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn)
|
||||
{
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_RelTruncate,
|
||||
@@ -1104,6 +1105,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
|
||||
.rel_number = NInfoGetRelNumber(rinfo),
|
||||
.fork_number = forkNum,
|
||||
.nblocks = nblocks,
|
||||
.lsn = lsn,
|
||||
}
|
||||
};
|
||||
NeonIOResult result;
|
||||
@@ -1126,7 +1128,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
|
||||
}
|
||||
|
||||
void
|
||||
communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn)
|
||||
{
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_RelUnlink,
|
||||
@@ -1136,6 +1138,7 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
.db_oid = NInfoGetDbOid(rinfo),
|
||||
.rel_number = NInfoGetRelNumber(rinfo),
|
||||
.fork_number = forkNum,
|
||||
.lsn = lsn,
|
||||
}
|
||||
};
|
||||
NeonIOResult result;
|
||||
@@ -1157,6 +1160,39 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn)
|
||||
{
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_ForgetCache,
|
||||
.forget_cache = {
|
||||
.request_id = assign_request_id(),
|
||||
.spc_oid = NInfoGetSpcOid(rinfo),
|
||||
.db_oid = NInfoGetDbOid(rinfo),
|
||||
.rel_number = NInfoGetRelNumber(rinfo),
|
||||
.fork_number = forkNum,
|
||||
.nblocks = nblocks,
|
||||
.lsn = lsn,
|
||||
}
|
||||
};
|
||||
NeonIOResult result;
|
||||
|
||||
perform_request(&request, &result);
|
||||
switch (result.tag)
|
||||
{
|
||||
case NeonIOResult_WriteOK:
|
||||
return;
|
||||
case NeonIOResult_Error:
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not forget cache for rel %u/%u/%u.%u: %s",
|
||||
RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error))));
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected result for ForgetCache operation: %d", result.tag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Debugging functions */
|
||||
|
||||
|
||||
@@ -49,8 +49,9 @@ extern void communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
extern void communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blockno, BlockNumber nblocks,
|
||||
XLogRecPtr lsn);
|
||||
extern void communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum);
|
||||
extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
|
||||
extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum);
|
||||
extern void communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn);
|
||||
extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn);
|
||||
extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn);
|
||||
extern void communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn);
|
||||
|
||||
#endif /* COMMUNICATOR_NEW_H */
|
||||
|
||||
@@ -292,6 +292,7 @@ extern int64 neon_dbsize(Oid dbNode);
|
||||
extern void neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
|
||||
BlockNumber blkno, neon_request_lsns *output,
|
||||
BlockNumber nblocks);
|
||||
extern XLogRecPtr neon_get_write_lsn(void);
|
||||
|
||||
/* utils for neon relsize cache */
|
||||
extern void relsize_hash_init(void);
|
||||
|
||||
@@ -502,6 +502,37 @@ nm_adjust_lsn(XLogRecPtr lsn)
|
||||
return lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get a LSN to use to stamp an operation like relation create or truncate.
|
||||
* On operations on individual pages we use the LSN of the page, but when
|
||||
* e.g. smgrcreate() is called, we have to do something else.
|
||||
*/
|
||||
XLogRecPtr
|
||||
neon_get_write_lsn(void)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* FIXME: v14 doesn't have GetCurrentReplayRecPtr(). Options:
|
||||
* - add it in our fork
|
||||
* - store a magic value that means that you must use
|
||||
* current latest possible LSN at the time that the request
|
||||
* on this thing is made again (or some other recent enough
|
||||
* lsn).
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
lsn = GetCurrentReplayRecPtr(NULL);
|
||||
#else
|
||||
lsn = GetXLogReplayRecPtr(NULL); /* FIXME: this is wrong, see above */
|
||||
#endif
|
||||
}
|
||||
else
|
||||
lsn = GetXLogInsertRecPtr();
|
||||
|
||||
return lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Return LSN for requesting pages and number of blocks from page server
|
||||
@@ -824,13 +855,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
|
||||
if (neon_enable_new_communicator)
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_write_lsn();
|
||||
|
||||
if (isRedo)
|
||||
{
|
||||
if (!communicator_new_rel_exists(InfoFromSMgrRel(reln), forkNum))
|
||||
communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum);
|
||||
communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum, lsn);
|
||||
}
|
||||
else
|
||||
communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum);
|
||||
communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum, lsn);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -902,7 +935,9 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo)
|
||||
{
|
||||
if (neon_enable_new_communicator)
|
||||
{
|
||||
communicator_new_rel_unlink(InfoFromNInfoB(rinfo), forkNum);
|
||||
XLogRecPtr lsn = neon_get_write_lsn();
|
||||
|
||||
communicator_new_rel_unlink(InfoFromNInfoB(rinfo), forkNum, lsn);
|
||||
}
|
||||
else
|
||||
forget_cached_relsize(InfoFromNInfoB(rinfo), forkNum);
|
||||
@@ -1962,7 +1997,9 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
|
||||
if (neon_enable_new_communicator)
|
||||
{
|
||||
communicator_new_rel_truncate(InfoFromSMgrRel(reln), forknum, nblocks);
|
||||
XLogRecPtr lsn = neon_get_write_lsn();
|
||||
|
||||
communicator_new_rel_truncate(InfoFromSMgrRel(reln), forknum, nblocks, lsn);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -2226,12 +2263,15 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
nblocks = mdnblocks(reln, MAIN_FORKNUM);
|
||||
recptr = GetXLogInsertRecPtr();
|
||||
|
||||
neon_set_lwlsn_block_range(recptr,
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM, 0, nblocks);
|
||||
neon_set_lwlsn_relation(recptr,
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM);
|
||||
if (!neon_enable_new_communicator)
|
||||
{
|
||||
neon_set_lwlsn_block_range(recptr,
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM, 0, nblocks);
|
||||
neon_set_lwlsn_relation(recptr,
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM);
|
||||
}
|
||||
|
||||
/* Remove local copy */
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
@@ -2240,8 +2280,12 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)),
|
||||
forknum);
|
||||
|
||||
// FIXME: also do this with the new communicator
|
||||
if (!neon_enable_new_communicator)
|
||||
if (neon_enable_new_communicator)
|
||||
{
|
||||
/* TODO: we could update the cache with the size, since we have it at hand */
|
||||
communicator_new_forget_cache(InfoFromSMgrRel(reln), forknum, nblocks, recptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
|
||||
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
|
||||
|
||||
@@ -16,6 +16,7 @@ def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("select pg_switch_wal()")
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)")
|
||||
cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g")
|
||||
|
||||
Reference in New Issue
Block a user