diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index cf97b02258..8bd2621964 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -368,9 +368,474 @@ impl WALRecord { } } -// Public interface functions - impl PageCache { + + // Public GET interface functions + + /// + /// GetPage@LSN + /// + /// Returns an 8k page image + /// + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + + let lsn = self.wait_lsn(req_lsn).await?; + + // Look up cache entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let minkey = CacheKey { tag, lsn: 0 }; + let maxkey = CacheKey { tag, lsn }; + + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); + let entry_opt = iter.next(); + + if entry_opt.is_none() { + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ + } + let (_k, v) = entry_opt.unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + let page_img: Bytes; + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + + // Request the WAL redo manager to apply the WAL records for us. + page_img = self.walredo_mgr.request_redo(tag, lsn).await?; + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + debug!( + "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", + page_lsn_hi, + page_lsn_lo, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, + tag.blknum + ); + + Ok(page_img) + } + + /// + /// Get size of relation at given LSN. + /// + pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn).await?; + return self.relsize_get_nowait(rel, lsn); + } + + /// + /// Does relation exist at given LSN? + /// + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + let lsn = self.wait_lsn(req_lsn).await?; + + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + debug!("Relation {:?} exists at {}", rel, lsn); + return Ok(true); + } + } + debug!("Relation {:?} doesn't exist at {}", rel, lsn); + Ok(false) + } + + // Other public functions, for updating the page cache. + // These are used by the WAL receiver and WAL redo. + + /// + /// Collect all the WAL records that are needed to reconstruct a page + /// image for the given cache entry. + /// + /// Returns an old page image (if any), and a vector of WAL records to apply + /// over it. + /// + pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { + let minkey = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: 0, + }, + lsn: 0, + }; + + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + let key = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: tag.blknum, + }, + lsn: lsn, + }; + buf.clear(); + key.pack(&mut buf); + let iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); + + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + for (_k, v) in iter { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(img) = &content.page_image { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } else if let Some(rec) = &content.wal_record { + records.push(rec.clone()); + + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + break; + } + } else { + panic!("no base image and no WAL record on cache entry"); + } + } + + records.reverse(); + (base_img, records) + } + + /// + /// Adds a WAL record to the page cache + /// + pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { + let lsn = rec.lsn; + let key = CacheKey { tag, lsn }; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + let _res = self.db.put(&key_buf[..], &val_buf[..]); + //trace!("put_wal_record lsn: {}", lsn); + + self.num_entries.fetch_add(1, Ordering::Relaxed); + self.num_wal_records.fetch_add(1, Ordering::Relaxed); + } + + /// + /// Adds a relation-wide WAL record (like truncate) to the page cache, + /// associating it with all pages started with specified block number + /// + pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + let mut key = CacheKey { tag, lsn: rec.lsn }; + + // What was the size of the relation before this record? + let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + // set new relation size + trace!("Truncate relation {:?}", tag); + let mut key_buf = BytesMut::new(); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + for blknum in tag.blknum..old_rel_size { + key_buf.clear(); + key.tag.blknum = blknum; + key.pack(&mut key_buf); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + } + let n = (old_rel_size - tag.blknum) as u64; + self.num_entries.fetch_add(n, Ordering::Relaxed); + self.num_wal_records.fetch_add(n, Ordering::Relaxed); + Ok(()) + } + + /// + /// Memorize a full image of a page version + /// + pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + let key = CacheKey { tag, lsn }; + let content = CacheEntryContent { + page_image: Some(img), + wal_record: None, + apply_pending: false, + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + + //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", + // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); + self.num_page_images.fetch_add(1, Ordering::Relaxed); + } + + pub fn create_database( + &self, + lsn: u64, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: 0, + }; + key.pack(&mut buf); + let iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Forward, + )); + let mut n = 0; + for (k, v) in iter { + buf.clear(); + buf.extend_from_slice(&k); + let mut key = CacheKey::unpack(&mut buf); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + buf.clear(); + key.pack(&mut buf); + + self.db.put(&buf[..], v)?; + n += 1; + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } + + /// Remember that WAL has been received and added to the page cache up to the given LSN + pub fn advance_last_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + let oldlsn = shared.last_valid_lsn; + if lsn >= oldlsn { + shared.last_valid_lsn = lsn; + self.seqwait_lsn.advance(lsn); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } else { + warn!( + "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", + oldlsn >> 32, + oldlsn & 0xffffffff, + lsn >> 32, + lsn & 0xffffffff + ); + } + } + + /// + /// Remember the (end of) last valid WAL record remembered in the page cache. + /// + /// NOTE: this updates last_valid_lsn as well. + /// + pub fn advance_last_record_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + assert!(lsn >= shared.last_record_lsn); + + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + self.seqwait_lsn.advance(lsn); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); + } + + /// + /// Remember the beginning of valid WAL. + /// + /// TODO: This should be called by garbage collection, so that if an older + /// page is requested, we will return an error to the requestor. + pub fn _advance_first_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.first_valid_lsn); + + // Can't overtake last_valid_lsn (except when we're + // initializing the system and last_valid_lsn hasn't been set yet. + assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + + shared.first_valid_lsn = lsn; + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn init_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + assert!(shared.first_valid_lsn == 0); + assert!(shared.last_valid_lsn == 0); + assert!(shared.last_record_lsn == 0); + + shared.first_valid_lsn = lsn; + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn get_last_valid_lsn(&self) -> u64 { + let shared = self.shared.lock().unwrap(); + + shared.last_record_lsn + } + + // + // Get statistics to be displayed in the user interface. + // + pub fn get_stats(&self) -> PageCacheStats { + PageCacheStats { + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), + last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), + } + } + + // Internal functions + + // + // Internal function to get relation size at given LSN. + // + // The caller must ensure that WAL has been received up to 'lsn'. + // + fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + + assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); + + let mut key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + + loop { + buf.clear(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(rec) = &content.wal_record { + if rec.truncate { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; + } + break; + } + } + let relsize = tag.blknum + 1; + debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); + return Ok(relsize); + } + } + break; + } + debug!("Size of relation {:?} at {} is zero", rel, lsn); + Ok(0) + } + async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { let mut minbuf = BytesMut::new(); @@ -499,456 +964,6 @@ impl PageCache { Ok(lsn) } - - // - // GetPage@LSN - // - // Returns an 8k page image - // - pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - - let lsn = self.wait_lsn(req_lsn).await?; - - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag, lsn: 0 }; - let maxkey = CacheKey { tag, lsn }; - - let mut buf = BytesMut::new(); - minkey.pack(&mut buf); - - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - buf.clear(); - maxkey.pack(&mut buf); - let mut iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); - let entry_opt = iter.next(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } - let (_k, v) = entry_opt.unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { - - // Request the WAL redo manager to apply the WAL records for us. - page_img = self.walredo_mgr.request_redo(tag, lsn).await?; - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); - } - - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - debug!( - "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", - page_lsn_hi, - page_lsn_lo, - tag.rel.spcnode, - tag.rel.dbnode, - tag.rel.relnode, - tag.rel.forknum, - tag.blknum - ); - - Ok(page_img) - } - - // - // Collect all the WAL records that are needed to reconstruct a page - // image for the given cache entry. - // - // Returns an old page image (if any), and a vector of WAL records to apply - // over it. - // - pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { - let minkey = CacheKey { - tag: BufferTag { - rel: tag.rel, - blknum: 0, - }, - lsn: 0, - }; - - let mut buf = BytesMut::new(); - minkey.pack(&mut buf); - - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - let key = CacheKey { - tag: BufferTag { - rel: tag.rel, - blknum: tag.blknum, - }, - lsn: lsn, - }; - buf.clear(); - key.pack(&mut buf); - let iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); - - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - for (_k, v) in iter { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(img) = &content.page_image { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img.clone()); - break; - } else if let Some(rec) = &content.wal_record { - records.push(rec.clone()); - - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } else { - panic!("no base image and no WAL record on cache entry"); - } - } - - records.reverse(); - (base_img, records) - } - - // - // Adds a WAL record to the page cache - // - pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let lsn = rec.lsn; - let key = CacheKey { tag, lsn }; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - apply_pending: false, - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - let _res = self.db.put(&key_buf[..], &val_buf[..]); - //trace!("put_wal_record lsn: {}", lsn); - - self.num_entries.fetch_add(1, Ordering::Relaxed); - self.num_wal_records.fetch_add(1, Ordering::Relaxed); - } - - // - // Adds a relation-wide WAL record (like truncate) to the page cache, - // associating it with all pages started with specified block number - // - pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { - let mut key = CacheKey { tag, lsn: rec.lsn }; - - // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); - let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - apply_pending: false, - }; - // set new relation size - trace!("Truncate relation {:?}", tag); - let mut key_buf = BytesMut::new(); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - for blknum in tag.blknum..old_rel_size { - key_buf.clear(); - key.tag.blknum = blknum; - key.pack(&mut key_buf); - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - } - let n = (old_rel_size - tag.blknum) as u64; - self.num_entries.fetch_add(n, Ordering::Relaxed); - self.num_wal_records.fetch_add(n, Ordering::Relaxed); - Ok(()) - } - - // - // Memorize a full image of a page version - // - pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { - let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: Some(img), - wal_record: None, - apply_pending: false, - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - - //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", - // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - self.num_page_images.fetch_add(1, Ordering::Relaxed); - } - - // - pub fn advance_last_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - let oldlsn = shared.last_valid_lsn; - if lsn >= oldlsn { - shared.last_valid_lsn = lsn; - self.seqwait_lsn.advance(lsn); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - } else { - warn!( - "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, - oldlsn & 0xffffffff, - lsn >> 32, - lsn & 0xffffffff - ); - } - } - - // - // NOTE: this updates last_valid_lsn as well. - // - pub fn advance_last_record_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); - assert!(lsn >= shared.last_record_lsn); - - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - self.seqwait_lsn.advance(lsn); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); - } - - // - pub fn _advance_first_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.first_valid_lsn); - - // Can't overtake last_valid_lsn (except when we're - // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); - - shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - } - - pub fn init_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); - - shared.first_valid_lsn = lsn; - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); - } - - pub fn get_last_valid_lsn(&self) -> u64 { - let shared = self.shared.lock().unwrap(); - - shared.last_record_lsn - } - - // - // Get size of relation at given LSN. - // - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - self.wait_lsn(lsn).await?; - self.relsize_get_nowait(rel, lsn) - } - - // - // Internal function to get relation size at given LSN. - // - // The caller must ensure that WAL has been received up to 'lsn'. - // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - - assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); - - let mut key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - - loop { - buf.clear(); - key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(rec) = &content.wal_record { - if rec.truncate { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; - continue; - } - break; - } - } - let relsize = tag.blknum + 1; - debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); - return Ok(relsize); - } - } - break; - } - debug!("Size of relation {:?} at {} is zero", rel, lsn); - Ok(0) - } - - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { - let lsn = self.wait_lsn(req_lsn).await?; - - let key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, _v)) = iter.next() { - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - debug!("Relation {:?} exists at {}", rel, lsn); - return Ok(true); - } - } - debug!("Relation {:?} doesn't exist at {}", rel, lsn); - Ok(false) - } - - // - // Get statistics to be displayed in the user interface. - // - pub fn get_stats(&self) -> PageCacheStats { - PageCacheStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), - } - } - - pub fn create_database( - &self, - lsn: u64, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> anyhow::Result<()> { - let mut buf = BytesMut::new(); - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: 0, - }; - key.pack(&mut buf); - let iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Forward, - )); - let mut n = 0; - for (k, v) in iter { - buf.clear(); - buf.extend_from_slice(&k); - let mut key = CacheKey::unpack(&mut buf); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - buf.clear(); - key.pack(&mut buf); - - self.db.put(&buf[..], v)?; - n += 1; - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); - Ok(()) - } } //