mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
6 Commits
jcsp/layer
...
skyzh/key-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8213358e5 | ||
|
|
a2710dc049 | ||
|
|
92a6aecaf0 | ||
|
|
ad26be4d12 | ||
|
|
f0a778774b | ||
|
|
91e6f71b48 |
@@ -1,5 +1,6 @@
|
||||
use anyhow::{bail, Result};
|
||||
use byteorder::{ByteOrder, BE};
|
||||
use bytes::BufMut;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -24,6 +25,60 @@ pub struct Key {
|
||||
pub const KEY_SIZE: usize = 18;
|
||||
|
||||
impl Key {
|
||||
/// Transform a raw key to a storage key
|
||||
pub fn from_raw_key_fixed(key: &[u8; RAW_KEY_SIZE]) -> Self {
|
||||
Key {
|
||||
field1: RAW_VALUE_KEY_PREFIX,
|
||||
field2: u16::from_be_bytes(key[0..2].try_into().unwrap()) as u32,
|
||||
field3: u32::from_be_bytes(key[2..6].try_into().unwrap()),
|
||||
field4: u32::from_be_bytes(key[6..10].try_into().unwrap()),
|
||||
field5: key[10],
|
||||
field6: u32::from_be_bytes(key[11..15].try_into().unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extract_raw_key_to_writer(&self, mut writer: impl BufMut) {
|
||||
assert_eq!(self.field1, RAW_VALUE_KEY_PREFIX, "prefix mismatched");
|
||||
assert!(self.field2 < 0xFFFF);
|
||||
writer.put_u16(self.field2 as u16);
|
||||
writer.put_u32(self.field3);
|
||||
writer.put_u32(self.field4);
|
||||
writer.put_u8(self.field5);
|
||||
writer.put_u32(self.field6);
|
||||
}
|
||||
|
||||
pub fn raw_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: RAW_VALUE_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: RAW_VALUE_KEY_PREFIX + 1,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Transform a raw key to a storage key
|
||||
pub fn from_raw_key(key: &[u8]) -> Self {
|
||||
Self::from_raw_key_fixed(
|
||||
key.try_into()
|
||||
.expect("raw key must has a length of pre-determined fixed-size"),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn extract_raw_key(&self) -> Vec<u8> {
|
||||
let mut key = Vec::with_capacity(RAW_KEY_SIZE);
|
||||
self.extract_raw_key_to_writer(&mut key);
|
||||
key
|
||||
}
|
||||
|
||||
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
|
||||
/// As long as Neon does not support tablespace (because of lack of access to local file system),
|
||||
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
|
||||
@@ -472,6 +527,9 @@ pub const AUX_FILES_KEY: Key = Key {
|
||||
field6: 2,
|
||||
};
|
||||
|
||||
pub const RAW_VALUE_KEY_PREFIX: u8 = 0x04;
|
||||
pub const RAW_KEY_SIZE: usize = 15;
|
||||
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
@@ -480,7 +538,7 @@ pub const AUX_FILES_KEY: Key = Key {
|
||||
// switch (and generally it likely should be optional), so ignore these.
|
||||
#[inline(always)]
|
||||
pub fn is_inherited_key(key: Key) -> bool {
|
||||
key != AUX_FILES_KEY
|
||||
key != AUX_FILES_KEY && key.field1 != RAW_VALUE_KEY_PREFIX /* TODO: should not filter all raw keys */
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
||||
@@ -15,6 +15,13 @@ pub struct KeySpace {
|
||||
}
|
||||
|
||||
impl KeySpace {
|
||||
/// Create a key space with a single range
|
||||
pub fn single(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
ranges: vec![key_range],
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
|
||||
/// in each partition.
|
||||
|
||||
@@ -11,9 +11,10 @@
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
@@ -24,7 +25,7 @@ use tracing::*;
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::pgdatadir_mapping::{Version, AUX_FILES_V2};
|
||||
use crate::tenant::Timeline;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
@@ -312,20 +313,59 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
if AUX_FILES_V2 {
|
||||
let files = self
|
||||
.timeline
|
||||
.get_vectored_alternative(
|
||||
KeySpace::single(Key::raw_key_range()),
|
||||
self.lsn,
|
||||
self.ctx,
|
||||
)
|
||||
.await?;
|
||||
for (_, val) in files {
|
||||
let val = val?;
|
||||
let mut ptr = &val[..];
|
||||
while ptr.has_remaining() {
|
||||
let key_len = ptr.get_u32() as usize;
|
||||
let key = &ptr[..key_len];
|
||||
ptr.advance(key_len);
|
||||
let val_len = ptr.get_u32() as usize;
|
||||
let content = &ptr[..val_len];
|
||||
ptr.advance(val_len);
|
||||
|
||||
let path = std::str::from_utf8(key)?;
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
info!("adding aux file {} to basebackup, length={}", path, val_len);
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
|
||||
@@ -13,14 +13,14 @@ use crate::repository::*;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, RAW_KEY_SIZE, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
@@ -32,7 +32,7 @@ use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::vec_map::{VecMap, VecMapOrdering};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
@@ -854,6 +854,54 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub const AUX_FILES_V2: bool = true;
|
||||
|
||||
fn split_prefix<'a, 'b>(path: &'a str, prefix: &'b str) -> Option<&'a str> {
|
||||
if path.starts_with(prefix) {
|
||||
Some(&path[prefix.len()..])
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn hash_to_raw_key(prefix: u16, data: &[u8]) -> [u8; RAW_KEY_SIZE] {
|
||||
let mut key = [0; RAW_KEY_SIZE];
|
||||
let hash = crc32c::crc32c(data).to_be_bytes();
|
||||
let prefix = prefix.to_be_bytes();
|
||||
key[0] = prefix[0];
|
||||
key[1] = prefix[1];
|
||||
key[2] = hash[0];
|
||||
key[3] = hash[1];
|
||||
key[4] = hash[2];
|
||||
key[5] = hash[3];
|
||||
key
|
||||
}
|
||||
|
||||
pub fn encode_aux_file_key(path: &str) -> anyhow::Result<Key> {
|
||||
if let Some(fname) = split_prefix(path, "pg_logical/mappings/") {
|
||||
let key = hash_to_raw_key(0x0101, fname.as_bytes());
|
||||
Ok(Key::from_raw_key_fixed(&key))
|
||||
} else if let Some(fname) = split_prefix(path, "pg_logical/snapshots/") {
|
||||
let key = hash_to_raw_key(0x0102, fname.as_bytes());
|
||||
Ok(Key::from_raw_key_fixed(&key))
|
||||
} else if path == "pg_logical/replorigin_checkpoint" {
|
||||
let mut key = [0; RAW_KEY_SIZE];
|
||||
key[0] = 0x01;
|
||||
key[1] = 0x03;
|
||||
Ok(Key::from_raw_key_fixed(&key))
|
||||
} else if let Some(fname) = split_prefix(path, "pg_replslot/") {
|
||||
let key = hash_to_raw_key(0x0201, fname.as_bytes());
|
||||
Ok(Key::from_raw_key_fixed(&key))
|
||||
} else {
|
||||
let key = hash_to_raw_key(0xFFFF, path.as_bytes());
|
||||
warn!(
|
||||
"unsupported aux file type: {}, putting to other files, non-scan-able by prefix",
|
||||
path
|
||||
);
|
||||
Ok(Key::from_raw_key_fixed(&key))
|
||||
}
|
||||
}
|
||||
|
||||
/// DatadirModification represents an operation to ingest an atomic set of
|
||||
/// updates to the repository. It is created by the 'begin_record'
|
||||
/// function. It is called for each WAL record, so that all the modifications
|
||||
@@ -1391,6 +1439,90 @@ impl<'a> DatadirModification<'a> {
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if AUX_FILES_V2 {
|
||||
fn filter_path_from(val: &[u8], path: &str) -> Bytes {
|
||||
let mut new_val = BytesMut::new();
|
||||
let mut ptr = val;
|
||||
while ptr.has_remaining() {
|
||||
let key_len = ptr.get_u32() as usize;
|
||||
let key = &ptr[..key_len];
|
||||
ptr.advance(key_len);
|
||||
let val_len = ptr.get_u32() as usize;
|
||||
let value = &ptr[..val_len];
|
||||
ptr.advance(val_len);
|
||||
if key != path.as_bytes() {
|
||||
new_val.put_u32(key_len as u32);
|
||||
new_val.put_slice(key);
|
||||
new_val.put_u32(val_len as u32);
|
||||
new_val.put_slice(value);
|
||||
}
|
||||
}
|
||||
new_val.freeze()
|
||||
}
|
||||
|
||||
fn update_path_to(val: &[u8], path: &str, content: &[u8]) -> Bytes {
|
||||
let mut new_val = BytesMut::new();
|
||||
let mut ptr = val;
|
||||
let mut replaced = false;
|
||||
while ptr.has_remaining() {
|
||||
let key_len = ptr.get_u32() as usize;
|
||||
let key = &ptr[..key_len];
|
||||
ptr.advance(key_len);
|
||||
let val_len = ptr.get_u32() as usize;
|
||||
let value = &ptr[..val_len];
|
||||
ptr.advance(val_len);
|
||||
if key != path.as_bytes() {
|
||||
new_val.put_u32(key_len as u32);
|
||||
new_val.put_slice(key);
|
||||
new_val.put_u32(val_len as u32);
|
||||
new_val.put_slice(value);
|
||||
} else {
|
||||
new_val.put_u32(path.len() as u32);
|
||||
new_val.put_slice(path.as_bytes());
|
||||
new_val.put_u32(content.len() as u32);
|
||||
new_val.put_slice(content);
|
||||
replaced = true;
|
||||
}
|
||||
}
|
||||
if !replaced {
|
||||
new_val.put_u32(path.len() as u32);
|
||||
new_val.put_slice(path.as_bytes());
|
||||
new_val.put_u32(content.len() as u32);
|
||||
new_val.put_slice(content);
|
||||
}
|
||||
new_val.freeze()
|
||||
}
|
||||
|
||||
let key = encode_aux_file_key(path)?;
|
||||
// retrieve the key from the engine
|
||||
let old_val = match self.get(key, ctx).await {
|
||||
Ok(val) => val,
|
||||
Err(PageReconstructError::Other(err))
|
||||
if err.to_string().contains("could not find data for key")
|
||||
|| err
|
||||
.to_string()
|
||||
.contains("could not find layer with more data for key") =>
|
||||
{
|
||||
// TODO: make could not found a separate error type, avoid anyhow wrapping
|
||||
Bytes::new()
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let new_val = if content.is_empty() {
|
||||
info!("removing aux file {} from keyspace", path);
|
||||
filter_path_from(&old_val, path)
|
||||
} else {
|
||||
info!(
|
||||
"adding aux file {} to keyspace, length={}",
|
||||
path,
|
||||
content.len()
|
||||
);
|
||||
update_path_to(&old_val, path, content)
|
||||
};
|
||||
self.put(key, Value::Image(new_val));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
@@ -1587,8 +1719,6 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_updates.len() + self.pending_deletions.len()
|
||||
}
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
@@ -1745,6 +1875,7 @@ mod tests {
|
||||
|
||||
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn aux_files_round_trip() -> anyhow::Result<()> {
|
||||
let name = "aux_files_round_trip";
|
||||
let harness = TenantHarness::create(name)?;
|
||||
|
||||
@@ -371,26 +371,23 @@ impl InMemoryLayer {
|
||||
|
||||
let mut planned_block_reads = BinaryHeap::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
};
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
let key = key.clone();
|
||||
if !keyspace.overlaps(&(key.clone()..key.next())) {
|
||||
continue;
|
||||
}
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
};
|
||||
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
planned_block_reads.push(BlockRead {
|
||||
key,
|
||||
lsn: *entry_lsn,
|
||||
block_offset: *pos,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
planned_block_reads.push(BlockRead {
|
||||
key,
|
||||
lsn: *entry_lsn,
|
||||
block_offset: *pos,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -680,6 +680,7 @@ impl Timeline {
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
self.get_impl(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
@@ -854,6 +855,19 @@ impl Timeline {
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) async fn get_vectored_alternative(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(GetVectoredError::InvalidLsn(lsn));
|
||||
}
|
||||
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
|
||||
vectored_res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn get_vectored_sequential_impl(
|
||||
&self,
|
||||
@@ -2964,7 +2978,10 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
|
||||
if keyspace.total_size() == 0
|
||||
|| timeline.ancestor_timeline.is_none()
|
||||
|| keyspace.overlaps(&Key::raw_key_range())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2976,9 +2993,9 @@ impl Timeline {
|
||||
timeline = &*timeline_owned;
|
||||
}
|
||||
|
||||
if keyspace.total_size() != 0 {
|
||||
return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
|
||||
}
|
||||
// if keyspace.total_size() != 0 {
|
||||
// return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -3078,7 +3095,6 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
unmapped_keyspace = keyspace_to_read;
|
||||
cont_lsn = next_cont_lsn;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user