diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 852670af2c..66fc195efc 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; +use bytes::BufMut; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::{Oid, TransactionId}; use serde::{Deserialize, Serialize}; @@ -24,6 +25,60 @@ pub struct Key { pub const KEY_SIZE: usize = 18; impl Key { + /// Transform a raw key to a storage key + pub fn from_raw_key_fixed(key: &[u8; RAW_KEY_SIZE]) -> Self { + Key { + field1: RAW_VALUE_KEY_PREFIX, + field2: u16::from_be_bytes(key[0..2].try_into().unwrap()) as u32, + field3: u32::from_be_bytes(key[2..6].try_into().unwrap()), + field4: u32::from_be_bytes(key[6..10].try_into().unwrap()), + field5: key[10], + field6: u32::from_be_bytes(key[11..15].try_into().unwrap()), + } + } + + pub fn extract_raw_key_to_writer(&self, mut writer: impl BufMut) { + assert_eq!(self.field1, RAW_VALUE_KEY_PREFIX, "prefix mismatched"); + assert!(self.field2 < 0xFFFF); + writer.put_u16(self.field2 as u16); + writer.put_u32(self.field3); + writer.put_u32(self.field4); + writer.put_u8(self.field5); + writer.put_u32(self.field6); + } + + pub fn raw_key_range() -> Range { + Key { + field1: RAW_VALUE_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: RAW_VALUE_KEY_PREFIX + 1, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + } + } + + /// Transform a raw key to a storage key + pub fn from_raw_key(key: &[u8]) -> Self { + Self::from_raw_key_fixed( + key.try_into() + .expect("raw key must has a length of pre-determined fixed-size"), + ) + } + + pub fn extract_raw_key(&self) -> Vec { + let mut key = Vec::with_capacity(RAW_KEY_SIZE); + self.extract_raw_key_to_writer(&mut key); + key + } + /// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish. /// As long as Neon does not support tablespace (because of lack of access to local file system), /// we can assume that only some predefined namespace OIDs are used which can fit in u16 @@ -472,6 +527,9 @@ pub const AUX_FILES_KEY: Key = Key { field6: 2, }; +pub const RAW_VALUE_KEY_PREFIX: u8 = 0x04; +pub const RAW_KEY_SIZE: usize = 15; + // Reverse mappings for a few Keys. // These are needed by WAL redo manager. diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 05fa4562e1..4265630de6 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -15,6 +15,11 @@ pub struct KeySpace { } impl KeySpace { + /// Create a key space with a single range + pub fn single(key_range: Range) -> Self { + Self { ranges: vec![key_range] } + } + /// /// Partition a key space into roughly chunks of roughly 'target_size' bytes /// in each partition. diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 0479d05f8f..9aac95887f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -11,9 +11,10 @@ //! from data stored in object storage. //! use anyhow::{anyhow, bail, ensure, Context}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use fail::fail_point; use pageserver_api::key::{key_to_slru_block, Key}; +use pageserver_api::keyspace::KeySpace; use postgres_ffi::pg_constants; use std::fmt::Write as FmtWrite; use std::time::SystemTime; @@ -24,7 +25,7 @@ use tracing::*; use tokio_tar::{Builder, EntryType, Header}; use crate::context::RequestContext; -use crate::pgdatadir_mapping::Version; +use crate::pgdatadir_mapping::{Version, AUX_FILES_V2}; use crate::tenant::Timeline; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -312,20 +313,59 @@ where } } - for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { - if path.starts_with("pg_replslot") { - let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; - let restart_lsn = Lsn(u64::from_le_bytes( - content[offs..offs + 8].try_into().unwrap(), - )); - info!("Replication slot {} restart LSN={}", path, restart_lsn); - min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + if AUX_FILES_V2 { + let files = self + .timeline + .get_vectored_alternative( + KeySpace::single(Key::raw_key_range()), + self.lsn, + self.ctx, + ) + .await?; + for (_, val) in files { + let val = val?; + let mut ptr = &val[..]; + while ptr.has_remaining() { + let key_len = ptr.get_u32() as usize; + let key = &ptr[..key_len]; + ptr.advance(key_len); + let val_len = ptr.get_u32() as usize; + let content = &ptr[..val_len]; + ptr.advance(val_len); + + let path = std::str::from_utf8(key)?; + if path.starts_with("pg_replslot") { + let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; + let restart_lsn = Lsn(u64::from_le_bytes( + content[offs..offs + 8].try_into().unwrap(), + )); + info!("Replication slot {} restart LSN={}", path, restart_lsn); + min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } + info!("adding aux file {} to basebackup, length={}", path, val_len); + let header = new_tar_header(&path, content.len() as u64)?; + self.ar + .append(&header, &*content) + .await + .context("could not add aux file to basebackup tarball")?; + } + } + } else { + for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { + if path.starts_with("pg_replslot") { + let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; + let restart_lsn = Lsn(u64::from_le_bytes( + content[offs..offs + 8].try_into().unwrap(), + )); + info!("Replication slot {} restart LSN={}", path, restart_lsn); + min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } + let header = new_tar_header(&path, content.len() as u64)?; + self.ar + .append(&header, &*content) + .await + .context("could not add aux file to basebackup tarball")?; } - let header = new_tar_header(&path, content.len() as u64)?; - self.ar - .append(&header, &*content) - .await - .context("could not add aux file to basebackup tarball")?; } } if min_restart_lsn != Lsn::MAX { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 6f7d74bdee..b0635cf5c6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,14 +13,14 @@ use crate::repository::*; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use enum_map::Enum; use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, RAW_KEY_SIZE, TWOPHASEDIR_KEY, }; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -32,7 +32,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -854,6 +854,46 @@ impl Timeline { } } +pub const AUX_FILES_V2: bool = true; + +fn split_prefix<'a, 'b>(path: &'a str, prefix: &'b str) -> Option<&'a str> { + if path.starts_with(prefix) { + Some(&path[prefix.len()..]) + } else { + None + } +} + +fn hash_to_raw_key(prefix: u16, data: &[u8]) -> [u8; RAW_KEY_SIZE] { + let mut key = [0; RAW_KEY_SIZE]; + let hash = crc32c::crc32c(data).to_be_bytes(); + let prefix = prefix.to_be_bytes(); + key[0] = prefix[0]; + key[1] = prefix[1]; + key[2] = hash[0]; + key[3] = hash[1]; + key[4] = hash[2]; + key[5] = hash[3]; + key +} + +pub fn encode_aux_file_key(path: &str) -> anyhow::Result { + if let Some(fname) = split_prefix(path, "pg_logical/") { + let key = hash_to_raw_key(0x0101, fname.as_bytes()); + Ok(Key::from_raw_key_fixed(&key)) + } else if let Some(fname) = split_prefix(path, "pg_replslot/") { + let key = hash_to_raw_key(0x0102, fname.as_bytes()); + Ok(Key::from_raw_key_fixed(&key)) + } else { + let key = hash_to_raw_key(0x01FF, path.as_bytes()); + warn!( + "unsupported aux file type: {}, putting to other files, non-scan-able by prefix", + path + ); + Ok(Key::from_raw_key_fixed(&key)) + } +} + /// DatadirModification represents an operation to ingest an atomic set of /// updates to the repository. It is created by the 'begin_record' /// function. It is called for each WAL record, so that all the modifications @@ -1391,6 +1431,83 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { + if AUX_FILES_V2 { + fn filter_path_from(val: &[u8], path: &str) -> Bytes { + let mut new_val = BytesMut::new(); + let mut ptr = val; + while ptr.has_remaining() { + let key_len = ptr.get_u32() as usize; + let key = &ptr[..key_len]; + ptr.advance(key_len); + let val_len = ptr.get_u32() as usize; + let value = &ptr[..val_len]; + ptr.advance(val_len); + if key != path.as_bytes() { + new_val.put_u32(key_len as u32); + new_val.put_slice(key); + new_val.put_u32(val_len as u32); + new_val.put_slice(value); + } + } + new_val.freeze() + } + + fn update_path_to(val: &[u8], path: &str, content: &[u8]) -> Bytes { + let mut new_val = BytesMut::new(); + let mut ptr = val; + let mut replaced = false; + while ptr.has_remaining() { + let key_len = ptr.get_u32() as usize; + let key = &ptr[..key_len]; + ptr.advance(key_len); + let val_len = ptr.get_u32() as usize; + let value = &ptr[..val_len]; + ptr.advance(val_len); + if key != path.as_bytes() { + new_val.put_u32(key_len as u32); + new_val.put_slice(key); + new_val.put_u32(val_len as u32); + new_val.put_slice(value); + } else { + new_val.put_u32(path.len() as u32); + new_val.put_slice(path.as_bytes()); + new_val.put_u32(content.len() as u32); + new_val.put_slice(content); + replaced = true; + } + } + if !replaced { + new_val.put_u32(path.len() as u32); + new_val.put_slice(path.as_bytes()); + new_val.put_u32(content.len() as u32); + new_val.put_slice(content); + } + new_val.freeze() + } + + let key = encode_aux_file_key(path)?; + // retrieve the key from the engine + let old_val = match self.get(key, ctx).await { + Ok(val) => val, + Err(PageReconstructError::Other(err)) + if err.to_string().contains("could not find data for key") => + { + // TODO: make could not found a separate error type, avoid anyhow wrapping + Bytes::new() + } + Err(e) => return Err(e.into()), + }; + let new_val = if content.is_empty() { + info!("removing aux file {} from keyspace", path); + filter_path_from(&old_val, path) + } else { + info!("adding aux file {} to keyspace, length={}", path, content.len()); + update_path_to(&old_val, path, content) + }; + self.put(key, Value::Image(new_val)); + return Ok(()); + } + let file_path = path.to_string(); let content = if content.is_empty() { None @@ -1587,8 +1704,6 @@ impl<'a> DatadirModification<'a> { self.pending_updates.len() + self.pending_deletions.len() } - // Internal helper functions to batch the modifications - async fn get(&self, key: Key, ctx: &RequestContext) -> Result { // Have we already updated the same key? Read the latest pending updated // version in that case. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 29751641b4..a3be6c854f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -371,26 +371,23 @@ impl InMemoryLayer { let mut planned_block_reads = BinaryHeap::new(); - for range in keyspace.ranges.iter() { - let mut key = range.start; - while key < range.end { - if let Some(vec_map) = inner.index.get(&key) { - let lsn_range = match reconstruct_state.get_cached_lsn(&key) { - Some(cached_lsn) => (cached_lsn + 1)..end_lsn, - None => self.start_lsn..end_lsn, - }; + for (key, vec_map) in inner.index.iter() { + let key = key.clone(); + if !keyspace.overlaps(&(key.clone()..key.next())) { + continue; + } + let lsn_range = match reconstruct_state.get_cached_lsn(&key) { + Some(cached_lsn) => (cached_lsn + 1)..end_lsn, + None => self.start_lsn..end_lsn, + }; - let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - planned_block_reads.push(BlockRead { - key, - lsn: *entry_lsn, - block_offset: *pos, - }); - } - } - - key = key.next(); + let slice = vec_map.slice_range(lsn_range); + for (entry_lsn, pos) in slice.iter().rev() { + planned_block_reads.push(BlockRead { + key, + lsn: *entry_lsn, + block_offset: *pos, + }); } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d046a60af4..2045a7fad8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -680,6 +680,7 @@ impl Timeline { self.timeline_get_throttle.throttle(ctx, 1).await; self.get_impl(key, lsn, ctx).await } + /// Not subject to [`Self::timeline_get_throttle`]. async fn get_impl( &self, @@ -854,6 +855,20 @@ impl Timeline { res } + pub(crate) async fn get_vectored_alternative( + &self, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + if !lsn.is_valid() { + return Err(GetVectoredError::InvalidLsn(lsn)); + } + let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await; + vectored_res + } + + /// Not subject to [`Self::timeline_get_throttle`]. pub(super) async fn get_vectored_sequential_impl( &self, @@ -2976,9 +2991,9 @@ impl Timeline { timeline = &*timeline_owned; } - if keyspace.total_size() != 0 { - return Err(GetVectoredError::MissingKey(keyspace.start().unwrap())); - } + // if keyspace.total_size() != 0 { + // return Err(GetVectoredError::MissingKey(keyspace.start().unwrap())); + // } Ok(()) } @@ -3078,7 +3093,6 @@ impl Timeline { ctx, ) .await?; - unmapped_keyspace = keyspace_to_read; cont_lsn = next_cont_lsn; } else {