This commit is contained in:
Heikki Linnakangas
2022-03-09 20:35:16 +02:00
parent da8beffc95
commit e7bd74d558
10 changed files with 43 additions and 107 deletions

View File

@@ -201,7 +201,7 @@ fn import_relfile<R: Repository>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
ensure!(blknum == nblocks as u32, "unexpected EOF");
break;
}
_ => {
@@ -211,17 +211,11 @@ fn import_relfile<R: Repository>(
};
blknum += 1;
}
ensure!(blknum == nblocks as u32);
Ok(())
}
/// FIXME
/// Import a "non-blocky" file into the repository
///
/// This is used for small files like the control file, twophase files etc. that
/// are just slurped into the repository as one blob.
///
/// Import a relmapper (pg_filenode.map) file into the repository
fn import_relmap_file<R: Repository>(
timeline: &mut DatadirTimelineWriter<R>,
spcnode: Oid,
@@ -239,6 +233,7 @@ fn import_relmap_file<R: Repository>(
Ok(())
}
/// Import a twophase state file (pg_twophase/<xid>) into the repository
fn import_twophase_file<R: Repository>(
timeline: &mut DatadirTimelineWriter<R>,
xid: TransactionId,
@@ -316,7 +311,7 @@ fn import_slru_file<R: Repository>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
ensure!(rpageno == nblocks as u32, "unexpected EOF");
break;
}
_ => {
@@ -326,7 +321,6 @@ fn import_slru_file<R: Repository>(
};
rpageno += 1;
}
ensure!(rpageno == nblocks as u32);
Ok(())
}

View File

@@ -4,10 +4,8 @@ use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::pg_constants;
// in # of key-value pairs
// FIXME Size of one segment in pages (128 MB)
pub const TARGET_FILE_SIZE_BYTES: u64 = 128 * 1024 * 1024;
pub const TARGET_FILE_SIZE: usize = (TARGET_FILE_SIZE_BYTES / 8192) as usize;
// Target file size, when creating iage and delta layers
pub const TARGET_FILE_SIZE_BYTES: u64 = 128 * 1024 * 1024; // 128 MB
///
/// Represents a set of Keys, in a compact form.

View File

@@ -1066,7 +1066,7 @@ impl LayeredTimeline {
// Recurse into ancestor if needed
if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
info!(
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
@@ -1693,7 +1693,6 @@ impl LayeredTimeline {
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
l.filename().display(),
retain_lsn,
//is_dropped, // FIXME
l.is_incremental(),
);
result.layers_needed_by_branches += 1;
@@ -1800,7 +1799,7 @@ impl LayeredTimeline {
self.walredo_mgr
.request_redo(key, request_lsn, base_img, data.records)?;
// FIXME
// FIXME: page caching
/*
if let RelishTag::Relation(rel_tag) = &rel {
let cache = page_cache::get();

View File

@@ -43,7 +43,7 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use zenith_utils::vec_map::VecMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
@@ -158,16 +158,6 @@ impl Layer for DeltaLayer {
assert!(self.key_range.contains(&key));
/* FIXME
match &reconstruct_state.img {
Some((cached_lsn, _)) if &self.lsn_range.end <= cached_lsn => {
reconstruct_state.lsn = *cached_lsn;
return Ok(ValueReconstructResult::Complete);
}
_ => {}
}
*/
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
@@ -181,15 +171,6 @@ impl Layer for DeltaLayer {
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
/* FIXME
match &reconstruct_state.img {
Some((cached_lsn, _)) if entry_lsn <= cached_lsn => {
return Ok(ValueReconstructResult::Complete);
}
_ => {}
}
*/
let val = Value::des(&utils::read_blob_from_chapter(&values_reader, *pos)?)?;
match val {
Value::Image(img) => {
@@ -221,14 +202,6 @@ impl Layer for DeltaLayer {
}
}
// Return a set of all distinct Keys present in this layer
fn collect_keys(&self, key_range: &Range<Key>, keys: &mut HashSet<Key>) -> Result<()> {
let inner = self.load()?;
keys.extend(inner.index.keys().filter(|x| key_range.contains(x)));
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_> {
let inner = self.load().unwrap();
@@ -647,6 +620,13 @@ impl DeltaLayerWriter {
}
}
///
/// Iterator over all key-value pairse stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
/// fashion.
///
struct DeltaValueIter<'a> {
all_offsets: Vec<(Key, Lsn, u64)>,
next_idx: usize,
@@ -662,13 +642,6 @@ impl<'a> Iterator for DeltaValueIter<'a> {
}
}
///
/// Iterator over all key-value pairse stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
/// fashion.
///
impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();

View File

@@ -47,7 +47,7 @@ impl Ord for DeltaFileName {
/// Represents the filename of a DeltaLayer
///
/// <seg start>-<seg end>-<LSN start>-<LSN end>
/// <key start>-<key end>__<LSN start>-<LSN end>
///
impl DeltaFileName {
///
@@ -136,8 +136,7 @@ impl Ord for ImageFileName {
///
/// Represents the filename of an ImageLayer
///
/// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<LSN>
/// FIXME
/// <key start>-<key end>__<LSN>
impl ImageFileName {
///
/// Parse a string as an image file name. Returns None if the filename does not

View File

@@ -28,7 +28,7 @@ use anyhow::{bail, Context, Result};
use bytes::Bytes;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fs;
use std::io::{BufWriter, Write};
use std::ops::Range;
@@ -137,16 +137,6 @@ impl Layer for ImageLayer {
assert!(self.key_range.contains(&key));
assert!(lsn_range.end >= self.lsn);
/* FIXME
match reconstruct_state.img {
Some((cached_lsn, _)) if self.lsn <= cached_lsn => {
reconstruct_state.lsn = cached_lsn;
return Ok(ValueReconstructResult::Complete);
}
_ => {}
}
*/
let inner = self.load()?;
if let Some(offset) = inner.index.get(&key) {
@@ -172,15 +162,6 @@ impl Layer for ImageLayer {
}
}
fn collect_keys(&self, key_range: &Range<Key>, keys: &mut HashSet<Key>) -> Result<()> {
let inner = self.load()?;
let index = &inner.index;
keys.extend(index.keys().filter(|x| key_range.contains(x)));
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
todo!();
}

View File

@@ -15,7 +15,7 @@ use crate::repository::{Key, Value};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use log::*;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::RwLock;
@@ -171,13 +171,6 @@ impl Layer for InMemoryLayer {
}
}
fn collect_keys(&self, key_range: &Range<Key>, keys: &mut HashSet<Key>) -> Result<()> {
let inner = self.inner.read().unwrap();
keys.extend(inner.index.keys().filter(|x| key_range.contains(x)));
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
todo!();
}

View File

@@ -7,7 +7,6 @@ use crate::walrecord::ZenithWalRecord;
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use std::collections::HashSet;
use std::ops::Range;
use std::path::PathBuf;
@@ -31,22 +30,20 @@ where
a.start == b.start && a.end == b.end
}
/// FIXME
/// Struct used to communicate across calls to 'get_page_reconstruct_data'.
/// Struct used to communicate across calls to 'get_value_reconstruct_data'.
///
/// Before first call to get_page_reconstruct_data, you can fill in 'page_img'
/// if you have an older cached version of the page available. That can save
/// work in 'get_page_reconstruct_data', as it can stop searching for page
/// versions when all the WAL records going back to the cached image have been
/// collected.
/// Before first call, you can fill in 'page_img' if you have an older cached
/// version of the page available. That can save work in
/// 'get_value_reconstruct_data', as it can stop searching for page versions
/// when all the WAL records going back to the cached image have been collected.
///
/// When get_page_reconstruct_data returns Complete, 'page_img' is set to an
/// image of the page, or the oldest WAL record in 'records' is a will_init-type
/// When get_value_reconstruct_data returns Complete, 'img' is set to an image
/// of the page, or the oldest WAL record in 'records' is a will_init-type
/// record that initializes the page without requiring a previous image.
///
/// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
/// been collected, but there are more records outside the current layer. Pass
/// the same PageReconstructData struct in the next 'get_page_reconstruct_data'
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug)]
@@ -70,13 +67,19 @@ pub enum ValueReconstructResult {
Missing,
}
/// FIXME
/// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs.
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
///
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
/// layers are used to ingest incoming WAL, and provide fast access
/// to the recent page versions. On-disk layers are stored as files on disk, and
/// are immutable. This trait presents the common functionality of
/// in-memory and on-disk layers.
/// layers are used to ingest incoming WAL, and provide fast access to the
/// recent page versions. On-disk layers are stored as files on disk, and are
/// immutable. This trait presents the common functionality of in-memory and
/// on-disk layers.
///
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
/// A delta layer contains all modifications within a range of LSNs and keys.
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN
///
pub trait Layer: Send + Sync {
fn get_tenant_id(&self) -> ZTenantId;
@@ -87,7 +90,6 @@ pub trait Layer: Send + Sync {
/// Range of segments that this layer covers
fn get_key_range(&self) -> Range<Key>;
/// FIXME
/// Inclusive start bound of the LSN range that this layer holds
/// Exclusive end bound of the LSN range that this layer holds.
///
@@ -129,11 +131,9 @@ pub trait Layer: Send + Sync {
/// Returns true for layers that are represented in memory.
fn is_in_memory(&self) -> bool;
/// Iterate through all keys and values stored in the layer
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
/// Return a set of all distinct Keys present in this layer
fn collect_keys(&self, key_range: &Range<Key>, keys: &mut HashSet<Key>) -> Result<()>;
/// Release memory used by this layer. There is no corresponding 'load'
/// function, that's done implicitly when you call one of the get-functions.
fn unload(&self) -> Result<()>;

View File

@@ -1,6 +1,6 @@
//!
//! This provides an abstraction to store PostgreSQL relations and other files
//! in the key-value store
//! in the key-value store that implements the Repository interface.
//!
//! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
//! walingest.rs handles a few things like implicit relation creation and extension.
@@ -592,7 +592,6 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> {
// - update relish header with size
pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
info!("CREAT: {}", rel);
// Add it to the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let buf = self.get(dir_key)?;

View File

@@ -242,7 +242,7 @@ fn walreceiver_main(
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
info!("received XLogData between {} and {}", startlsn, endlsn);
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);