diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e555e069d0..91f5eb272a 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -647,18 +647,25 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return found; } +#if PG_MAJORVERSION_NUM >= 16 +static PGIOAlignedBlock voidblock = {0}; +#else +static PGAlignedBlock voidblock = {0}; +#endif +#define SCRIBBLEPAGE (&voidblock.data) + /* * Try to read pages from local cache. * Returns the number of pages read from the local cache, and sets bits in - * 'read' for the pages which were read. This may scribble over buffers not - * marked in 'read', so be careful with operation ordering. + * 'mask' for the pages which were read. This may scribble over buffers not + * marked in 'mask', so be careful with operation ordering. * * In case of error local file cache is disabled (lfc->limit is set to zero), - * and -1 is returned. Note that 'read' and the buffers may be touched and in - * an otherwise invalid state. + * and -1 is returned. * - * If the mask argument is supplied, bits will be set at the offsets of pages - * that were present and read from the LFC. + * If the mask argument is supplied, we'll only try to read those pages which + * don't have their bits set on entry. At exit, pages which were successfully + * read from LFC will have their bits set. */ int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, @@ -693,23 +700,43 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, while (nblocks > 0) { struct iovec iov[PG_IOV_MAX]; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0}; + int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1)); int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; - int n_blocks_to_read = 0; + int n_blocks_to_read = 0; + int iov_last_used = 0; + int first_block_in_chunk_read = -1; ConditionVariable* cv; Assert(blocks_in_chunk > 0); for (int i = 0; i < blocks_in_chunk; i++) { - n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0); - iov[i].iov_base = buffers[buf_offset + i]; iov[i].iov_len = BLCKSZ; - BITMAP_CLR(mask, buf_offset + i); + /* mask not set = we must do work */ + if (!BITMAP_ISSET(mask, buf_offset + i)) + { + iov[i].iov_base = buffers[buf_offset + i]; + n_blocks_to_read++; + iov_last_used = i + 1; + + if (first_block_in_chunk_read == -1) + { + first_block_in_chunk_read = i; + } + } + /* mask set = we must do no work */ + else + { + /* don't scribble on pages we weren't requested to write to */ + iov[i].iov_base = SCRIBBLEPAGE; + } } + + /* shortcut IO */ if (n_blocks_to_read == 0) { buf_offset += blocks_in_chunk; @@ -718,6 +745,12 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, continue; } + /* + * The effective iov size must be >= the number of blocks we're about + * to read. + */ + Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read); + tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; @@ -762,10 +795,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, generation = lfc_ctl->generation; entry_offset = entry->offset; - for (int i = 0; i < blocks_in_chunk; i++) + for (int i = first_block_in_chunk_read; i < iov_last_used; i++) { FileCacheBlockState state = UNAVAILABLE; bool sleeping = false; + + /* no need to work on something we're not interested in */ + if (BITMAP_ISSET(mask, buf_offset + i)) + continue; + while (lfc_ctl->generation == generation) { state = GET_STATE(entry, chunk_offs + i); @@ -789,7 +827,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } if (state == AVAILABLE) { - BITMAP_SET(mask, buf_offset + i); + BITMAP_SET(chunk_mask, i); iteration_hits++; } else @@ -801,16 +839,34 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (iteration_hits != 0) { + /* chunk offset (# of pages) into the LFC file */ + off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK; + int nwrite = iov_last_used - first_block_in_chunk_read; + /* offset of first IOV */ + first_read_offset += chunk_offs + first_block_in_chunk_read; + pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ); - rc = preadv(lfc_desc, iov, blocks_in_chunk, - ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + + /* Read only the blocks we're interested in, limiting */ + rc = preadv(lfc_desc, &iov[first_block_in_chunk_read], + nwrite, first_read_offset * BLCKSZ); pgstat_report_wait_end(); - if (rc != (BLCKSZ * blocks_in_chunk)) + if (rc != (BLCKSZ * nwrite)) { lfc_disable("read"); return -1; } + + /* + * We successfully read the pages we know were valid when we + * started reading; now mark those pages as read + */ + for (int i = first_block_in_chunk_read; i < iov_last_used; i++) + { + if (BITMAP_ISSET(chunk_mask, i)) + BITMAP_SET(mask, buf_offset + i); + } } /* Place entry to the head of LRU list */ diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 475697f9c0..68f7430343 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -315,7 +315,7 @@ static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, void *buffer) { - bits8 rv = 1; + bits8 rv = 0; return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1; } diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ddcee74ff3..2424a5fcb6 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1081,6 +1081,9 @@ prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_r * pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure * to calculate the LSNs to send. * + * Bits set in *mask (if present) indicate pages already read; i.e. pages we + * can skip in this process. + * * When performing a prefetch rather than a synchronous request, * is_prefetch==true. Currently, it only affects how the request is accounted * in the perf counters. @@ -1126,7 +1129,7 @@ Retry: uint64 ring_index; neon_request_lsns *lsns; - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) + if (PointerIsValid(mask) && BITMAP_ISSET(mask, i)) continue; if (frlsns) @@ -3026,9 +3029,6 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, tag.blockNum = blocknum; - for (int i = 0; i < PG_IOV_MAX / 8; i++) - lfc_present[i] = ~(lfc_present[i]); - ring_index = prefetch_register_bufferv(tag, NULL, iterblocks, lfc_present, true); @@ -3134,6 +3134,15 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, #endif } +/* + * Read N pages at a specific LSN. + * + * *mask is set for pages read at a previous point in time, and which we + * should not touch, nor overwrite. + * New bits should be set in *mask for the pages we'successfully read. + * + * The offsets in request_lsns, buffers, and mask are linked. + */ static void #if PG_MAJORVERSION_NUM < 16 neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns, @@ -3186,7 +3195,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block neon_request_lsns *reqlsns = &request_lsns[i]; TimestampTz start_ts, end_ts; - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) + if (PointerIsValid(mask) && BITMAP_ISSET(mask, i)) continue; start_ts = GetCurrentTimestamp(); @@ -3485,9 +3494,7 @@ static void neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks) { - bits8 prefetch_hits[PG_IOV_MAX / 8] = {0}; - bits8 lfc_hits[PG_IOV_MAX / 8]; - bits8 read[PG_IOV_MAX / 8]; + bits8 read_pages[PG_IOV_MAX / 8]; neon_request_lsns request_lsns[PG_IOV_MAX]; int lfc_result; int prefetch_result; @@ -3519,19 +3526,18 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks); + memset(read_pages, 0, sizeof(read_pages)); - prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits); + prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, + blocknum, request_lsns, nblocks, + buffers, read_pages); if (prefetch_result == nblocks) return; - /* invert the result: exclude prefetched blocks */ - for (int i = 0; i < PG_IOV_MAX / 8; i++) - lfc_hits[i] = ~prefetch_hits[i]; - /* Try to read from local file cache */ lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers, - nblocks, lfc_hits); + nblocks, read_pages); if (lfc_result > 0) MyNeonCounters->file_cache_hits_total += lfc_result; @@ -3540,21 +3546,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, if (prefetch_result + lfc_result == nblocks) return; - if (lfc_result <= 0) - { - /* can't use the LFC result, so read all blocks from PS */ - for (int i = 0; i < PG_IOV_MAX / 8; i++) - read[i] = ~prefetch_hits[i]; - } - else - { - /* invert the result: exclude blocks read from lfc */ - for (int i = 0; i < PG_IOV_MAX / 8; i++) - read[i] = ~(prefetch_hits[i] | lfc_hits[i]); - } - neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, - buffers, nblocks, read); + buffers, nblocks, read_pages); /* * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.