Introduce a new "layered" repository implementation.

This replaces the RocksDB based implementation with an approach using
"snapshot files" on disk, and in-memory btreemaps to hold the recent
changes.

This make the repository implementation a configuration option. You can
choose 'layered' or 'rocksdb' with "zenith init --repository-format=<format>"
The unit tests have been refactored to exercise both implementations.
'layered' is now the default.

Push/pull is not implemented. The 'test_history_inmemory' test has been
commented out accordingly. It's not clear how we will implement that
functionality; probably by copying the snapshot files directly.
This commit is contained in:
Heikki Linnakangas
2021-08-16 10:06:48 +03:00
parent 5eb1738e8b
commit 2450f82de5
24 changed files with 3435 additions and 75 deletions

99
Cargo.lock generated
View File

@@ -1,7 +1,5 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.4.7"
@@ -82,6 +80,30 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "aversion"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41992ab8cfcc3026ef9abceffe0c2b0479c043183fc23825e30d22baab6df334"
dependencies = [
"aversion-macros",
"byteorder",
"serde",
"serde_cbor",
"thiserror",
]
[[package]]
name = "aversion-macros"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ba5785f953985aa0caca927ba4005880f3b4f53de87f134e810ae3549f744d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "aws-creds"
version = "0.26.0"
@@ -166,6 +188,18 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bookfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753"
dependencies = [
"aversion",
"byteorder",
"serde",
"thiserror",
]
[[package]]
name = "boxfnonce"
version = "0.1.1"
@@ -646,6 +680,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "half"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3"
[[package]]
name = "hashbrown"
version = "0.9.1"
@@ -1139,6 +1179,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"bookfile",
"byteorder",
"bytes",
"chrono",
@@ -1276,24 +1317,6 @@ dependencies = [
"tokio-postgres 0.7.1",
]
[[package]]
name = "postgres-protocol"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3"
dependencies = [
"base64 0.13.0",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-protocol"
version = "0.6.1"
@@ -1313,14 +1336,21 @@ dependencies = [
]
[[package]]
name = "postgres-types"
version = "0.2.1"
name = "postgres-protocol"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f"
checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3"
dependencies = [
"base64 0.13.0",
"byteorder",
"bytes",
"fallible-iterator",
"postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac",
"md-5",
"memchr",
"rand",
"sha2",
"stringprep",
]
[[package]]
@@ -1333,6 +1363,17 @@ dependencies = [
"postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
]
[[package]]
name = "postgres-types"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "postgres_ffi"
version = "0.1.0"
@@ -1735,6 +1776,16 @@ dependencies = [
"xml-rs",
]
[[package]]
name = "serde_cbor"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.126"

View File

@@ -42,6 +42,9 @@ pub struct LocalEnv {
#[serde(with = "hex")]
pub tenantid: ZTenantId,
// Repository format, 'rocksdb' or 'layered' or None for default
pub repository_format: Option<String>,
// jwt auth token used for communication with pageserver
pub auth_token: String,
@@ -101,6 +104,7 @@ pub fn init(
remote_pageserver: Option<&str>,
tenantid: ZTenantId,
auth_type: AuthType,
repository_format: Option<&str>,
) -> Result<()> {
// check if config already exists
let base_path = base_path();
@@ -176,6 +180,7 @@ pub fn init(
base_data_dir: base_path,
remotes: BTreeMap::default(),
tenantid,
repository_format: repository_format.map(|x| x.into()),
auth_token,
auth_type,
private_key_path,
@@ -194,6 +199,7 @@ pub fn init(
base_data_dir: base_path,
remotes: BTreeMap::default(),
tenantid,
repository_format: repository_format.map(|x| x.into()),
auth_token,
auth_type,
private_key_path,

View File

@@ -50,7 +50,7 @@ impl PageServerNode {
.unwrap()
}
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> {
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool, repository_format: Option<&str>) -> Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let mut args = vec![
"--init",
@@ -65,6 +65,10 @@ impl PageServerNode {
args.extend(&["--auth-type", "ZenithJWT"]);
}
if let Some(repo_format) = repository_format {
args.extend(&["--repository-format", repo_format]);
}
create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid]));
let status = cmd
.args(args)

View File

@@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bookfile = "^0.3"
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"

View File

@@ -20,14 +20,14 @@ use anyhow::{ensure, Result};
use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
use pageserver::{branches, logger, page_cache, page_service, PageServerConf};
use pageserver::{branches, logger, page_cache, page_service, PageServerConf, RepositoryFormat};
use zenith_utils::http_endpoint;
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898";
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
const DEFAULT_SUPERUSER: &str = "zenith_admin";
@@ -41,6 +41,7 @@ struct CfgFileParams {
pg_distrib_dir: Option<String>,
auth_validation_public_key_path: Option<String>,
auth_type: Option<String>,
repository_format: Option<String>,
}
impl CfgFileParams {
@@ -58,6 +59,7 @@ impl CfgFileParams {
pg_distrib_dir: get_arg("postgres-distrib"),
auth_validation_public_key_path: get_arg("auth-validation-public-key-path"),
auth_type: get_arg("auth-type"),
repository_format: get_arg("repository-format"),
}
}
@@ -74,6 +76,7 @@ impl CfgFileParams {
.auth_validation_public_key_path
.or(other.auth_validation_public_key_path),
auth_type: self.auth_type.or(other.auth_type),
repository_format: self.repository_format.or(other.repository_format),
}
}
@@ -133,6 +136,16 @@ impl CfgFileParams {
);
}
let repository_format = match self.repository_format.as_ref() {
Some(repo_format_str) if repo_format_str == "rocksdb" => RepositoryFormat::RocksDb,
Some(repo_format_str) if repo_format_str == "layered" => RepositoryFormat::Layered,
Some(repo_format_str) => anyhow::bail!(
"invalid --repository-format '{}', must be 'rocksdb' or 'layered'",
repo_format_str
),
None => RepositoryFormat::Layered, // default
};
Ok(PageServerConf {
daemonize: false,
@@ -148,8 +161,9 @@ impl CfgFileParams {
pg_distrib_dir,
auth_validation_public_key_path,
auth_type,
repository_format,
})
}
}
@@ -221,6 +235,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"),
)
.arg(
Arg::with_name("repository-format")
.long("repository-format")
.takes_value(true)
.help("Which repository implementation to use, 'rocksdb' or 'layered'"),
)
.get_matches();
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));

View File

@@ -24,7 +24,7 @@ use crate::object_repository::ObjectRepository;
use crate::page_cache;
use crate::restore_local_repo;
use crate::walredo::WalRedoManager;
use crate::{repository::Repository, PageServerConf};
use crate::{repository::Repository, PageServerConf, RepositoryFormat};
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
@@ -65,8 +65,8 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str
pub fn create_repo(
conf: &'static PageServerConf,
tenantid: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager>,
) -> Result<ObjectRepository> {
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
) -> Result<Arc<dyn Repository>> {
let repo_dir = conf.tenant_path(&tenantid);
if repo_dir.exists() {
bail!("repo for {} already exists", tenantid)
@@ -96,19 +96,27 @@ pub fn create_repo(
// and we failed to run initdb again in the same directory. This has been solved for the
// rapid init+start case now, but the general race condition remains if you restart the
// server quickly.
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?;
let repo: Arc<dyn Repository + Sync + Send> =
match conf.repository_format {
RepositoryFormat::Layered => Arc::new(
crate::layered_repository::LayeredRepository::new(conf, wal_redo_manager, tenantid),
),
RepositoryFormat::RocksDb => {
let obj_store = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?;
let repo = crate::object_repository::ObjectRepository::new(
conf,
std::sync::Arc::new(storage),
wal_redo_manager,
tenantid,
);
Arc::new(ObjectRepository::new(
conf,
Arc::new(obj_store),
wal_redo_manager,
tenantid,
))
}
};
// Load data into pageserver
// TODO To implement zenith import we need to
// move data loading out of create_repo()
bootstrap_timeline(conf, tenantid, tli, &repo)?;
bootstrap_timeline(conf, tenantid, tli, &*repo)?;
Ok(repo)
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,298 @@
# Overview
The on-disk format is based on immutable files. The page server
receives a stream of incoming WAL, parses the WAL records to determine
which pages they apply to, and accumulates the incoming changes in
memory. Every now and then, the accumulated changes are written out to
new files.
The files are called "snapshot files". Each snapshot file corresponds
to one PostgreSQL relation fork. The snapshot files for each timeline
are stored in the timeline's subdirectory under
.zenith/tenants/<tenantid>/timelines.
The files are named like this:
rel_<spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
For example:
rel_1663_13990_2609_0_000000000169C348_0000000001702000
Some non-relation files are also stored in repository. For example,
a CLOG segment would be named like this:
pg_xact_0000_00000000198B06B0_00000000198C2550
There is no difference in how the relation and non-relation files are
managed, except that the first part of file names is different.
Internally, the relations and non-relation files that are managed in
the versioned store are together called "relishes".
Each snapshot file contains a full snapshot, that is, full copy of all
pages in the relation, as of the "start LSN". It also contains all WAL
records applicable to the relation between the start and end
LSNs. With this information, the page server can reconstruct any page
version of the relation in the LSN range.
If a file has been dropped, the last snapshot file for it is created
with the _DROPPED suffix, e.g.
rel_1663_13990_2609_0_000000000169C348_0000000001702000_DROPPED
In addition to the relations, with "rel_*" prefix, we use the same
format for storing various smaller files from the PostgreSQL data
directory. They will use different suffixes and the naming scheme
up to the LSN range varies. The Zenith source code uses the term
"relish" to mean "a relation, or other file that's treated like a
relation in the storage"
## Notation used in this document
The full path of a snapshot file looks like this:
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_000000000169C348_0000000001702000
For simplicity, the examples below use a simplified notation for the
paths. The tenant ID is left out, the timeline ID is replaced with
the human-readable branch name, and spcnode+dbnode+relnode+forkum with
a human-readable table name. The LSNs are also shorter. For example, a
snapshot file for 'orders' table on 'main' branch, with LSN range
100-200 would be:
main/orders_100_200
# Creating snapshot files
Let's start with a simple example with a system that contains one
branch called 'main' and two tables, 'orders' and 'customers'. The end
of WAL is currently at LSN 250. In this starting situation, you would
have two files on disk:
main/orders_100_200
main/customers_100_200
In addition to those files, the recent changes between LSN 200 and the
end of WAL at 250 are kept in memory. If the page server crashes, the
latest records between 200-250 need to be re-read from the WAL.
Whenever enough WAL has been accumulated in memory, the page server
writes out the changes in memory into new snapshot files. This process
is called "checkpointing" (not to be confused with the PostgreSQL
checkpoints, that's a different thing). The page server only creates
snapshot files for relations that have been modified since the last
checkpoint. For example, if the current end of WAL is at LSN 450, and
the last checkpoint happened at LSN 400 but there hasn't been any
recent changes to 'customers' table, you would have these files on
disk:
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/customers_100_200
If the customers table is modified later, a new file is created for it
at the next checkpoint. The new file will cover the "gap" from the
last snapshot file, so the LSN ranges are always contiguous:
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/customers_100_200
main/customers_200_500
## Reading page versions
Whenever a GetPage@LSN request comes in from the compute node, the
page server needs to reconstruct the requested page, as it was at the
requested LSN. To do that, the page server first checks the recent
in-memory layer; if the requested page version is found there, it can
be returned immediatedly without looking at the files on
disk. Otherwise the page server needs to locate the snapshot file that
contains the requested page version.
For example, if a request comes in for table 'orders' at LSN 250, the
page server would load the 'main/orders_200_300' file into memory, and
reconstruct and return the requested page from it, as it was at
LSN 250. Because the snapshot file consists of a full image of the
relation at the start LSN and the WAL, reconstructing the page
involves replaying any WAL records applicable to the page between LSNs
200-250, starting from the base image at LSN 200.
A request at a file boundary can be satisfied using either file. For
example, if there are two files on disk:
main/orders_100_200
main/orders_200_300
And a request comes with LSN 200, either file can be used for it. It
is better to use the later file, however, because it contains an
already materialized version of all the pages at LSN 200. Using the
first file, you would need to apply any WAL records between 100 and
200 to reconstruct the requested page.
# Multiple branches
Imagine that a child branch is created at LSN 250:
@250
----main--+-------------------------->
\
+---child-------------->
Then, the 'orders' table is updated differently on the 'main' and
'child' branches. You now have this situation on disk:
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/customers_100_200
child/orders_250_300
child/orders_300_400
Because the 'customers' table hasn't been modified on the child
branch, there is no file for it there. If you request a page for it on
the 'child' branch, the page server will not find any snapshot file
for it in the 'child' directory, so it will recurse to look into the
parent 'main' branch instead.
From the 'child' branch's point of view, the history for each relation
is linear, and the request's LSN identifies unambiguously which file
you need to look at. For example, the history for the 'orders' table
on the 'main' branch consists of these files:
main/orders_100_200
main/orders_200_300
main/orders_300_400
And from the 'child' branch's point of view, it consists of these
files:
main/orders_100_200
main/orders_200_300
child/orders_250_300
child/orders_300_400
The branch metadata includes the point where the child branch was
created, LSN 250. If a page request comes with LSN 275, we read the
page version from the 'child/orders_250_300' file. If the request LSN
is 225, we read it from the 'main/orders_200_300' file instead. The
page versions between 250-300 in the 'main/orders_200_300' file are
ignored when operating on the child branch.
Note: It doesn't make any difference if the child branch is created
when the end of the main branch was at LSN 250, or later when the tip of
the main branch had already moved on. The latter case, creating a
branch at a historic LSN, is how we support PITR in Zenith.
# Garbage collection
In this scheme, we keep creating new snapshot files over time. We also
need a mechanism to remove old files that are no longer needed,
because disk space isn't infinite.
What files are still needed? Currently, the page server supports PITR
and branching from any branch at any LSN that is "recent enough" from
the tip of the branch. "Recent enough" is defined as an LSN horizon,
which by default is 64 MB. (See DEFAULT_GC_HORIZON). For this
example, let's assume that the LSN horizon is 150 units.
Let's look at the single branch scenario again. Imagine that the end
of the branch is LSN 525, so that the GC horizon is currently at
525-150 = 375
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/orders_400_500
main/customers_100_200
We can remove files 'main/orders_100_200' and 'main/orders_200_300',
because the end LSNs of those files are older than GC horizon 375, and
there are more recent snapshot files for the table. 'main/orders_300_400'
and 'main/orders_400_500' are still within the horizon, so they must be
retained. 'main/customers_100_200' is old enough, but it cannot be
removed because there is no newer snapshot file for the table.
Things get slightly more complicated with multiple branches. All of
the above still holds, but in addition to recent files we must also
retain older shapshot files that are still needed by child branches.
For example, if child branch is created at LSN 150, and the 'customers'
table is updated on the branch, you would have these files:
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/orders_400_500
main/customers_100_200
child/customers_150_300
In this situation, the 'main/orders_100_200' file cannot be removed,
even though it is older than the GC horizon, because it is still
needed by the child branch. 'main/orders_200_300' can still be
removed. So after garbage collection, these files would remain:
main/orders_100_200
main/orders_300_400
main/orders_400_500
main/customers_100_200
child/customers_150_300
If 'orders' is modified later on the 'child' branch, we will create a
snapshot file for it on the child:
main/orders_100_200
main/orders_300_400
main/orders_400_500
main/customers_100_200
child/customers_150_300
child/orders_150_400
After this, the 'main/orders_100_200' file can be removed. It is no
longer needed by the child branch, because there is a newer snapshot
file there. TODO: This optimization hasn't been implemented! The GC
algorithm will currently keep the file on the 'main' branch anyway, for
as long as the child branch exists.
# TODO: On LSN ranges
In principle, each relation can be checkpointed separately, i.e. the
LSN ranges of the files don't need to line up. So this would be legal:
main/orders_100_200
main/orders_200_300
main/orders_300_400
main/customers_150_250
main/customers_250_500
However, the code currently always checkpoints all relations together.
So that situation doesn't arise in practice.
It would also be OK to have overlapping LSN ranges for the same relation:
main/orders_100_200
main/orders_200_300
main/orders_250_350
main/orders_300_400
The code that reads the snapshot files should cope with this, but this
situation doesn't arise either, because the checkpointing code never
does that. It could be useful, however, as a transient state when
garbage collecting around branch points, or explicit recovery
points. For example, if we start with this:
main/orders_100_200
main/orders_200_300
main/orders_300_400
And there is a branch or explicit recovery point at LSN 150, we could
replace 'main/orders_100_200' with 'main/orders_150_150' to keep a
snapshot only at that exact point that's still needed, removing the
other page versions around it. But such compaction has not been
implemented yet.

View File

@@ -0,0 +1,534 @@
//!
//! An in-memory layer stores recently received page versions in memory. The page versions
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::PageVersion;
use crate::layered_repository::SnapshotLayer;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::sync::{Arc, Mutex};
use zenith_utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
rel: RelishTag,
///
/// This layer contains all the changes from 'start_lsn'. The
/// start is inclusive. There is no end LSN; we only use in-memory
/// layer at the end of a timeline.
///
start_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: Mutex<InMemoryLayerInner>,
}
pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
///
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
///
/// `relsizes` tracks the size of the relation at different points in time.
///
relsizes: BTreeMap<Lsn, u32>,
}
impl Layer for InMemoryLayer {
fn is_frozen(&self) -> bool {
return false;
}
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
}
fn get_relish_tag(&self) -> RelishTag {
return self.rel;
}
fn get_start_lsn(&self) -> Lsn {
return self.start_lsn;
}
fn get_end_lsn(&self) -> Lsn {
return Lsn(u64::MAX);
}
fn is_dropped(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.drop_lsn.is_some()
}
/// Look up given page in the cache.
fn get_page_at_lsn(
&self,
walredo_mgr: &dyn WalRedoManager,
blknum: u32,
lsn: Lsn,
) -> Result<Bytes> {
// Scan the BTreeMap backwards, starting from the given entry.
let mut records: Vec<WALRecord> = Vec::new();
let mut page_img: Option<Bytes> = None;
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
{
let inner = self.inner.lock().unwrap();
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
if let Some(img) = &entry.page_image {
page_img = Some(img.clone());
need_base_image_lsn = None;
break;
} else if let Some(rec) = &entry.record {
records.push(rec.clone());
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
need_base_image_lsn = None;
break;
} else {
need_base_image_lsn = Some(*entry_lsn);
}
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
}
// release lock on 'page_versions'
}
records.reverse();
// If we needed a base image to apply the WAL records against, we should have found it in memory.
if let Some(lsn) = need_base_image_lsn {
if records.is_empty() {
// no records, and no base image. This can happen if PostgreSQL extends a relation
// but never writes the page.
//
// Would be nice to detect that situation better.
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
return Ok(ZERO_PAGE.clone());
}
bail!(
"No base image found for page {} blk {} at {}/{}",
self.rel,
blknum,
self.timelineid,
lsn
);
}
// If we have a page image, and no WAL, we're all set
if records.is_empty() {
if let Some(img) = page_img {
trace!(
"found page image for blk {} in {} at {}/{}, no WAL redo required",
blknum,
self.rel,
self.timelineid,
lsn
);
Ok(img)
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
Ok(ZERO_PAGE.clone())
}
} else {
// We need to do WAL redo.
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if page_img.is_none() && !records.first().unwrap().will_init {
// FIXME: this ought to be an error?
warn!(
"Base image for page {}/{} at {} not found, but got {} WAL records",
self.rel,
blknum,
lsn,
records.len()
);
Ok(ZERO_PAGE.clone())
} else {
if page_img.is_some() {
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
} else {
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
}
let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?;
self.put_page_image(blknum, lsn, img.clone())?;
Ok(img)
}
}
}
/// Get size of the relation at given LSN
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>> {
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.inner.lock().unwrap();
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
if let Some((_entry_lsn, entry)) = iter.next_back() {
let result = *entry;
drop(inner);
trace!("get_relish_size: {} at {} -> {}", self.rel, lsn, result);
Ok(Some(result))
} else {
Ok(None)
}
}
/// Does this relation exist at given LSN?
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool> {
let inner = self.inner.lock().unwrap();
// Is the requested LSN after the rel was dropped?
if let Some(drop_lsn) = inner.drop_lsn {
if lsn >= drop_lsn {
return Ok(false);
}
}
// Otherwise, it exists
Ok(true)
}
// Write operations
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> {
trace!(
"put_page_version blk {} of {} at {}/{}",
blknum,
self.rel,
self.timelineid,
lsn
);
let mut inner = self.inner.lock().unwrap();
let old = inner.page_versions.insert((blknum, lsn), pv);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {:?} blk {} at {} already exists",
self.rel, blknum, lsn
);
}
// Also update the relation size, if this extended the relation.
if self.rel.is_blocky() {
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
let oldsize;
if let Some((_entry_lsn, entry)) = iter.next_back() {
oldsize = *entry;
} else {
oldsize = 0;
//bail!("No old size found for {} at {}", self.tag, lsn);
}
if blknum >= oldsize {
trace!(
"enlarging relation {} from {} to {} blocks at {}",
self.rel,
oldsize,
blknum + 1,
lsn
);
inner.relsizes.insert(lsn, blknum + 1);
}
}
Ok(())
}
/// Remember that the relation was truncated at given LSN
fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
let old = inner.relsizes.insert(lsn, relsize);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Inserting truncation, but had an entry for the LSN already");
}
Ok(())
}
/// Remember that the relation was dropped at given LSN
fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
assert!(inner.drop_lsn.is_none());
inner.drop_lsn = Some(lsn);
info!("dropped relation {} at {}", self.rel, lsn);
Ok(())
}
///
/// Write the this in-memory layer to disk, as a snapshot layer.
///
/// The cutoff point for the layer that's written to disk is 'end_lsn'.
///
/// Returns new layers that replace this one. Always returns a
/// SnapshotLayer containing the page versions that were written to disk,
/// but if there were page versions newer than 'end_lsn', also return a new
/// in-memory layer containing those page versions. The caller replaces
/// this layer with the returned layers in the layer map.
///
fn freeze(
&self,
cutoff_lsn: Lsn,
walredo_mgr: &dyn WalRedoManager,
) -> Result<Vec<Arc<dyn Layer>>> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.rel, self.timelineid, cutoff_lsn
);
let inner = self.inner.lock().unwrap();
// Normally, use the cutoff LSN as the end of the frozen layer.
// But if the relation was dropped, we know that there are no
// more changes coming in for it, and in particular we know that
// there are no changes "in flight" for the LSN anymore, so we use
// the drop LSN instead. The drop-LSN could be ahead of the
// caller-specified LSN!
let dropped = inner.drop_lsn.is_some();
let end_lsn =
if dropped {
inner.drop_lsn.unwrap()
} else {
cutoff_lsn
};
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let mut before_page_versions;
let mut before_relsizes;
let mut after_page_versions;
let mut after_relsizes;
if !dropped {
before_relsizes = BTreeMap::new();
after_relsizes = BTreeMap::new();
for (lsn, size) in inner.relsizes.iter() {
if *lsn > end_lsn {
after_relsizes.insert(*lsn, *size);
} else {
before_relsizes.insert(*lsn, *size);
}
}
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
} else {
before_page_versions = inner.page_versions.clone();
before_relsizes = inner.relsizes.clone();
after_relsizes = BTreeMap::new();
after_page_versions = BTreeMap::new();
}
// we can release the lock now.
drop(inner);
// Write the page versions before the cutoff to disk.
let snapfile = SnapshotLayer::create(
self.conf,
self.timelineid,
self.tenantid,
self.rel,
self.start_lsn,
end_lsn,
dropped,
before_page_versions,
before_relsizes,
)?;
let mut result: Vec<Arc<dyn Layer>> = Vec::new();
// If there were any page versions after the cutoff, initialize a new in-memory layer
// to hold them
if !after_relsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.rel, end_lsn);
let new_layer = Self::copy_snapshot(
self.conf,
walredo_mgr,
&snapfile,
self.timelineid,
self.tenantid,
end_lsn,
)?;
let mut new_inner = new_layer.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.relsizes.append(&mut after_relsizes);
drop(new_inner);
result.push(Arc::new(new_layer));
}
result.push(Arc::new(snapfile));
Ok(result)
}
fn delete(&self) -> Result<()> {
// Nothing to do. When the reference is dropped, the memory is released.
Ok(())
}
fn unload(&self) -> Result<()> {
// cannot unload in-memory layer. Freeze instead
Ok(())
}
}
impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
rel: RelishTag,
start_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
rel,
timelineid,
start_lsn
);
Ok(InMemoryLayer {
conf,
timelineid,
tenantid,
rel,
start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
})
}
///
/// Initialize a new InMemoryLayer for, by copying the state at the given
/// point in time from given existing layer.
///
pub fn copy_snapshot(
conf: &'static PageServerConf,
walredo_mgr: &dyn WalRedoManager,
src: &dyn Layer,
timelineid: ZTimelineId,
tenantid: ZTenantId,
lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
src.get_relish_tag(),
timelineid,
lsn
);
let mut page_versions = BTreeMap::new();
let mut relsizes = BTreeMap::new();
let size;
if src.get_relish_tag().is_blocky() {
if let Some(sz) = src.get_relish_size(lsn)? {
relsizes.insert(lsn, sz);
size = sz;
} else {
bail!("no size found or {} at {}", src.get_relish_tag(), lsn);
}
} else {
size = 1;
}
for blknum in 0..size {
let img = src.get_page_at_lsn(walredo_mgr, blknum, lsn)?;
let pv = PageVersion {
page_image: Some(img),
record: None,
};
page_versions.insert((blknum, lsn), pv);
}
Ok(InMemoryLayer {
conf,
timelineid,
tenantid,
rel: src.get_relish_tag(),
start_lsn: lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: page_versions,
relsizes: relsizes,
}),
})
}
/// debugging function to print out the contents of the layer
#[allow(unused)]
pub fn dump(&self) -> String {
let mut result = format!(
"----- inmemory layer for {} {}-> ----\n",
self.rel, self.start_lsn
);
let inner = self.inner.lock().unwrap();
for (k, v) in inner.relsizes.iter() {
result += &format!("{}: {}\n", k, v);
}
for (k, v) in inner.page_versions.iter() {
result += &format!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
v.page_image.is_some(),
v.record.is_some()
);
}
result
}
}

View File

@@ -0,0 +1,132 @@
//!
//! The layer map tracks what layers exist for all the relations in a timeline.
//!
//! When the timeline is first accessed, the server lists of all snapshot files
//! in the timelines/<timelineid> directory, and populates this map with
//! SnapshotLayers corresponding to each file. When new WAL is received,
//! we create InMemoryLayers to hold the incoming records. Now and then,
//! in the checkpoint() function, the in-memory layers are frozen, forming
//! new snapshot layers and corresponding files are written to disk.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::relish::*;
use anyhow::Result;
use log::*;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::ops::Bound::Included;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN.
/// It provides a couple of convenience functions over a plain BTreeMap
pub struct LayerMap {
pub inner: BTreeMap<(RelishTag, Lsn), Arc<dyn Layer>>,
}
impl LayerMap {
///
/// Look up using the given rel tag and LSN. This differs from a plain
/// key-value lookup in that if there is any layer that covers the
/// given LSN, or precedes the given LSN, it is returned. In other words,
/// you don't need to know the exact start LSN of the layer.
///
pub fn get(&self, tag: RelishTag, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let startkey = (tag, Lsn(0));
let endkey = (tag, lsn);
if let Some((_k, v)) = self
.inner
.range((Included(startkey), Included(endkey)))
.next_back()
{
Some(Arc::clone(v))
} else {
None
}
}
pub fn insert(&mut self, layer: Arc<dyn Layer>) {
let rel = layer.get_relish_tag();
let start_lsn = layer.get_start_lsn();
self.inner.insert((rel, start_lsn), Arc::clone(&layer));
}
pub fn remove(&mut self, layer: &dyn Layer) {
let rel = layer.get_relish_tag();
let start_lsn = layer.get_start_lsn();
self.inner.remove(&(rel, start_lsn));
}
pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result<HashSet<RelTag>> {
let mut rels: HashSet<RelTag> = HashSet::new();
// Scan the timeline directory to get all rels in this timeline.
for ((rel, _lsn), _l) in self.inner.iter() {
if let RelishTag::Relation(reltag) = rel {
// FIXME: skip if it was dropped before the requested LSN. But there is no
// LSN argument
if (spcnode == 0 || reltag.spcnode == spcnode)
&& (dbnode == 0 || reltag.dbnode == dbnode)
{
rels.insert(*reltag);
}
}
}
Ok(rels)
}
pub fn list_nonrels(&self, _lsn: Lsn) -> Result<HashSet<RelishTag>> {
let mut rels: HashSet<RelishTag> = HashSet::new();
// Scan the timeline directory to get all rels in this timeline.
for ((rel, _lsn), _l) in self.inner.iter() {
// FIXME: skip if it was dropped before the requested LSN.
if let RelishTag::Relation(_) = rel {
} else {
rels.insert(*rel);
}
}
Ok(rels)
}
/// Is there a newer layer for given relation?
pub fn newer_layer_exists(&self, rel: RelishTag, lsn: Lsn) -> bool {
let startkey = (rel, lsn);
let endkey = (rel, Lsn(u64::MAX));
for ((_rel, newer_lsn), layer) in self.inner.range((Included(startkey), Included(endkey))) {
if layer.get_end_lsn() > lsn {
trace!(
"found later layer for rel {}, {} {}-{}",
rel,
lsn,
newer_lsn,
layer.get_end_lsn()
);
return true;
} else {
trace!(
"found singleton layer for rel {}, {} {}",
rel, lsn, newer_lsn
);
continue;
}
}
trace!("no later layer found for rel {}, {}", rel, lsn);
false
}
}
impl Default for LayerMap {
fn default() -> Self {
LayerMap {
inner: BTreeMap::new(),
}
}
}

View File

@@ -0,0 +1,631 @@
//!
//! A SnapshotLayer represents one snapshot file on disk. One file holds all page
//! version and size information of one relation, in a range of LSN.
//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't
//! contain a snapshot at a specific LSN, but rather all the page versions in a range
//! of LSNs.
//!
//! Currently, a snapshot file contains full information needed to reconstruct any
//! page version in the LSN range, without consulting any other snapshot files. When
//! a new snapshot file is created for writing, the full contents of relation are
//! materialized as it is at the beginning of the LSN range. That can be very expensive,
//! we should find a way to store differential files. But this keeps the read-side
//! of things simple. You can find the correct snapshot file based on RelishTag and
//! timeline+LSN, and once you've located it, you have all the data you need to in that
//! file.
//!
//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into
//! the SnapshotLayer struct. See load() and unload() functions.
//!
//! On disk, the snapshot files are stored in timelines/<timelineid> directory.
//! Currently, there are no subdirectories, and each snapshot file is named like this:
//!
//! <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
//!
//! For example:
//!
//! 1663_13990_2609_0_000000000169C348_000000000169C349
//!
//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that.
//! So the above example would become:
//!
//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED
//!
//! The end LSN indicates when it was dropped in that case, we don't store it in the
//! file contents in any way.
//!
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::PageVersion;
use crate::layered_repository::storage_layer::ZERO_PAGE;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::collections::BTreeMap;
use std::fmt;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::ops::Bound::Included;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
// Magic constant to identify a Zenith snapshot file
static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01;
static PAGE_VERSIONS_CHAPTER: u64 = 1;
static REL_SIZES_CHAPTER: u64 = 2;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct SnapshotFileName {
rel: RelishTag,
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
}
impl SnapshotFileName {
fn from_str(fname: &str) -> Option<Self> {
// Split the filename into parts
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
//
// or if it was dropped:
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>_DROPPED
//
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
parts = rest.split('_');
rel = RelishTag::Relation(RelTag {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
relnode: parts.next()?.parse::<u32>().ok()?,
forknum: parts.next()?.parse::<u8>().ok()?,
});
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::Clog,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
parts = rest.split('_');
rel = RelishTag::FileNodeMap {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
parts = rest.split('_');
rel = RelishTag::TwoPhase {
xid: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
parts = rest.split('_');
rel = RelishTag::Checkpoint;
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
parts = rest.split('_');
rel = RelishTag::ControlFile;
} else {
return None;
}
let start_lsn = Lsn::from_hex(parts.next()?).ok()?;
let end_lsn = Lsn::from_hex(parts.next()?).ok()?;
let mut dropped = false;
if let Some(suffix) = parts.next() {
if suffix == "DROPPED" {
dropped = true;
} else {
warn!("unrecognized filename in timeline dir: {}", fname);
return None;
}
}
if parts.next().is_some() {
warn!("unrecognized filename in timeline dir: {}", fname);
return None;
}
Some(SnapshotFileName {
rel,
start_lsn,
end_lsn,
dropped,
})
}
fn to_string(&self) -> String {
let basename = match self.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
),
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
} => format!("pg_xact_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
} => format!("pg_multixact_members_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
} => format!("pg_multixact_offsets_{:04X}", segno),
RelishTag::FileNodeMap { spcnode, dbnode } => {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
RelishTag::ControlFile => format!("pg_control"),
};
format!(
"{}_{:016X}_{:016X}{}",
basename,
u64::from(self.start_lsn),
u64::from(self.end_lsn),
if self.dropped { "_DROPPED" } else { "" }
)
}
}
impl fmt::Display for SnapshotFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_string())
}
}
///
/// SnapshotLayer is the in-memory data structure associated with an
/// on-disk snapshot file. We keep a SnapshotLayer in memory for each
/// file, in the LayerMap. If a layer is in "loaded" state, we have a
/// copy of the file in memory, in 'inner'. Otherwise the struct is
/// just a placeholder for a file that exists on disk, and it needs to
/// be loaded before using it in queries.
///
pub struct SnapshotLayer {
conf: &'static PageServerConf,
pub tenantid: ZTenantId,
pub timelineid: ZTimelineId,
pub rel: RelishTag,
//
// This entry contains all the changes from 'start_lsn' to 'end_lsn'. The
// start is inclusive, and end is exclusive.
pub start_lsn: Lsn,
pub end_lsn: Lsn,
dropped: bool,
inner: Mutex<SnapshotLayerInner>,
}
pub struct SnapshotLayerInner {
/// If false, the 'page_versions' and 'relsizes' have not been
/// loaded into memory yet.
loaded: bool,
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: BTreeMap<Lsn, u32>,
}
impl Layer for SnapshotLayer {
fn is_frozen(&self) -> bool {
return true;
}
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
}
fn get_relish_tag(&self) -> RelishTag {
return self.rel;
}
fn is_dropped(&self) -> bool {
return self.dropped;
}
fn get_start_lsn(&self) -> Lsn {
return self.start_lsn;
}
fn get_end_lsn(&self) -> Lsn {
return self.end_lsn;
}
/// Look up given page in the cache.
fn get_page_at_lsn(
&self,
walredo_mgr: &dyn WalRedoManager,
blknum: u32,
lsn: Lsn,
) -> Result<Bytes> {
// Scan the BTreeMap backwards, starting from the given entry.
let mut records: Vec<WALRecord> = Vec::new();
let mut page_img: Option<Bytes> = None;
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
{
let inner = self.load()?;
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
if let Some(img) = &entry.page_image {
page_img = Some(img.clone());
need_base_image_lsn = None;
break;
} else if let Some(rec) = &entry.record {
records.push(rec.clone());
if rec.will_init {
// This WAL record initializes the page, so no need to go further back
need_base_image_lsn = None;
break;
} else {
need_base_image_lsn = Some(*entry_lsn);
}
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
}
// release lock on 'inner'
}
records.reverse();
// If we needed a base image to apply the WAL records against, we should have found it in memory.
if let Some(lsn) = need_base_image_lsn {
if records.is_empty() {
// no records, and no base image. This can happen if PostgreSQL extends a relation
// but never writes the page.
//
// Would be nice to detect that situation better.
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
return Ok(ZERO_PAGE.clone());
}
bail!(
"No base image found for page {} blk {} at {}/{}",
self.rel,
blknum,
self.timelineid,
lsn
);
}
// If we have a page image, and no WAL, we're all set
if records.is_empty() {
if let Some(img) = page_img {
trace!(
"found page image for blk {} in {} at {}/{}, no WAL redo required",
blknum,
self.rel,
self.timelineid,
lsn
);
Ok(img)
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
Ok(ZERO_PAGE.clone())
}
} else {
// We need to do WAL redo.
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if page_img.is_none() && !records.first().unwrap().will_init {
// FIXME: this ought to be an error?
warn!(
"Base image for page {} blk {} at {} not found, but got {} WAL records",
self.rel,
blknum,
lsn,
records.len()
);
Ok(ZERO_PAGE.clone())
} else {
if page_img.is_some() {
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
} else {
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
}
let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?;
// FIXME: Should we memoize the page image in memory, so that
// we wouldn't need to reconstruct it again, if it's requested again?
//self.put_page_image(blknum, lsn, img.clone())?;
Ok(img)
}
}
}
/// Get size of the relation at given LSN
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>> {
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
if let Some((_entry_lsn, entry)) = iter.next_back() {
let result = *entry;
drop(inner);
trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result);
Ok(Some(result))
} else {
Ok(None)
}
}
/// Does this relation exist at given LSN?
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool> {
// Is the requested LSN after the rel was dropped?
if self.dropped && lsn >= self.end_lsn {
return Ok(false);
}
// Otherwise, it exists.
Ok(true)
}
// Unsupported write operations
fn put_page_version(&self, blknum: u32, lsn: Lsn, _pv: PageVersion) -> Result<()> {
panic!(
"cannot modify historical snapshot layer, rel {} blk {} at {}/{}, {}-{}",
self.rel, blknum, self.timelineid, lsn, self.start_lsn, self.end_lsn
);
}
fn put_truncation(&self, _lsn: Lsn, _relsize: u32) -> anyhow::Result<()> {
bail!("cannot modify historical snapshot layer");
}
fn put_unlink(&self, _lsn: Lsn) -> anyhow::Result<()> {
bail!("cannot modify historical snapshot layer");
}
fn freeze(
&self,
_end_lsn: Lsn,
_walredo_mgr: &dyn WalRedoManager,
) -> Result<Vec<Arc<dyn Layer>>> {
bail!("cannot freeze historical snapshot layer");
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.page_versions = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
}
}
impl SnapshotLayer {
fn path(&self) -> PathBuf {
Self::path_for(
self.conf,
self.timelineid,
self.tenantid,
&SnapshotFileName {
rel: self.rel,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
)
}
fn path_for(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
fname: &SnapshotFileName,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(fname.to_string())
}
/// Create a new snapshot file, using the given btreemaps containing the page versions and
/// relsizes.
///
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
rel: RelishTag,
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<SnapshotLayer> {
let snapfile = SnapshotLayer {
conf: conf,
timelineid: timelineid,
tenantid: tenantid,
rel: rel,
start_lsn: start_lsn,
end_lsn,
dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: true,
page_versions: page_versions,
relsizes: relsizes,
}),
};
let inner = snapfile.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
let path = snapfile.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?;
// Write out page versions
let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER);
let buf = BTreeMap::ser(&inner.page_versions)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
// and relsizes to separate chapter
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
let buf = BTreeMap::ser(&inner.relsizes)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
book.close()?;
trace!("saved {}", &path.display());
drop(inner);
Ok(snapfile)
}
///
/// Load the contents of the file into memory
///
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
// quick exit if already loaded
let mut inner = self.inner.lock().unwrap();
if inner.loaded {
return Ok(inner);
}
let path = Self::path_for(
self.conf,
self.timelineid,
self.tenantid,
&SnapshotFileName {
rel: self.rel,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
},
);
let file = File::open(&path)?;
let book = Book::new(file)?;
let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?;
let page_versions = BTreeMap::des(&chapter)?;
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
let relsizes = BTreeMap::des(&chapter)?;
debug!("loaded from {}", &path.display());
*inner = SnapshotLayerInner {
loaded: true,
page_versions,
relsizes,
};
Ok(inner)
}
/// Create SnapshotLayers representing all files on dik
///
// TODO: returning an Iterator would be more idiomatic
pub fn list_snapshot_files(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<Vec<Arc<dyn Layer>>> {
let path = conf.timeline_path(&timelineid, &tenantid);
let mut snapfiles: Vec<Arc<dyn Layer>> = Vec::new();
for direntry in fs::read_dir(path)? {
let fname = direntry?.file_name();
let fname = fname.to_str().unwrap();
if let Some(snapfilename) = SnapshotFileName::from_str(fname) {
let snapfile = SnapshotLayer {
conf,
timelineid,
tenantid,
rel: snapfilename.rel,
start_lsn: snapfilename.start_lsn,
end_lsn: snapfilename.end_lsn,
dropped: snapfilename.dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: false,
page_versions: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
};
snapfiles.push(Arc::new(snapfile));
}
}
return Ok(snapfiles);
}
/// debugging function to print out the contents of the layer
#[allow(unused)]
pub fn dump(&self) -> String {
let mut result = format!(
"----- snapshot layer for {} {}-{} ----\n",
self.rel, self.start_lsn, self.end_lsn
);
let inner = self.inner.lock().unwrap();
for (k, v) in inner.relsizes.iter() {
result += &format!("{}: {}\n", k, v);
}
//for (k, v) in inner.page_versions.iter() {
// result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some());
//}
result
}
}

View File

@@ -0,0 +1,123 @@
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
use crate::ZTimelineId;
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
pub static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
///
/// Represents a version of a page at a specific LSN. The LSN is the key of the
/// entry in the 'page_versions' hash, it is not duplicated here.
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
///
/// It's also possible to have both a WAL record and a page image in the same
/// PageVersion. That happens if page version is originally stored as a WAL record
/// but it is later reconstructed by a GetPage@LSN request by performing WAL
/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to
/// the WAL record in that case. TODO: That's pretty accidental, not the result
/// of any grand design. If we want to keep reconstructed page versions around, we
/// probably should have a separate buffer cache so that we could control the
/// replacement policy globally. Or if we keep a reconstructed page image, we
/// could throw away the WAL record.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageVersion {
/// an 8kb page image
pub page_image: Option<Bytes>,
/// WAL record to get from previous page version to this one.
pub record: Option<WALRecord>,
}
///
/// A Layer holds all page versions for one relish, in a range of LSNs.
/// There are two kinds of layers, in-memory and snapshot layers. In-memory
/// layers are used to ingest incoming WAL, and provide fast access
/// to the recent page versions. Snaphot layers are stored on disk, and
/// are immutable.
///
/// Each layer contains a full snapshot of the relish at the start
/// LSN. In addition to that, it contains WAL (or more page images)
/// needed to recontruct any page version up to the end LSN.
///
pub trait Layer: Send + Sync {
// These functions identify the relish and the LSN range that this Layer
// holds.
fn get_timeline_id(&self) -> ZTimelineId;
fn get_relish_tag(&self) -> RelishTag;
fn get_start_lsn(&self) -> Lsn;
fn get_end_lsn(&self) -> Lsn;
fn is_dropped(&self) -> bool;
/// Frozen layers are stored on disk, an cannot accept cannot accept new WAL
/// records, whereas an unfrozen layer can still be modified, but is not
/// durable in case of a crash. Snapshot layers are always frozen, and
/// in-memory layers are always unfrozen.
fn is_frozen(&self) -> bool;
// Functions that correspond to the Timeline trait functions.
fn get_page_at_lsn(
&self,
walredo_mgr: &dyn WalRedoManager,
blknum: u32,
lsn: Lsn,
) -> Result<Bytes>;
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>>;
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool>;
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()>;
fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()>;
fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()>;
/// Remember new page version, as a WAL record over previous version
fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> {
self.put_page_version(
blknum,
rec.lsn,
PageVersion {
page_image: None,
record: Some(rec),
},
)
}
/// Remember new page version, as a full page image
fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
self.put_page_version(
blknum,
lsn,
PageVersion {
page_image: Some(img),
record: None,
},
)
}
///
/// Split off an immutable layer from existing layer.
///
/// Returns new layers that replace this one.
///
fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager)
-> Result<Vec<Arc<dyn Layer>>>;
/// Permanently delete this layer
fn delete(&self) -> Result<()>;
/// Try to release memory used by this layer. This is currently
/// only used by snapshot layers, to free the copy of the file
/// from memory. (TODO: a smarter, more granular caching scheme
/// would be nice)
fn unload(&self) -> Result<()>;
}

View File

@@ -9,6 +9,7 @@ use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
pub mod basebackup;
pub mod branches;
pub mod layered_repository;
pub mod logger;
pub mod object_key;
pub mod object_repository;
@@ -54,6 +55,14 @@ pub struct PageServerConf {
pub auth_type: AuthType,
pub auth_validation_public_key_path: Option<PathBuf>,
pub repository_format: RepositoryFormat,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RepositoryFormat {
Layered,
RocksDb,
}
impl PageServerConf {

View File

@@ -2,11 +2,12 @@
//! page server.
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::object_repository::ObjectRepository;
use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use crate::{PageServerConf, RepositoryFormat};
use anyhow::{anyhow, bail, Result};
use lazy_static::lazy_static;
use log::info;
@@ -27,16 +28,35 @@ pub fn init(conf: &'static PageServerConf) {
for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() {
let tenantid =
ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap();
let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap();
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenantid);
// Set up an object repository, for actual data storage.
let repo =
ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid);
let repo: Arc<dyn Repository + Sync + Send> = match conf.repository_format {
RepositoryFormat::Layered => {
let repo = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenantid,
));
LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
repo
}
RepositoryFormat::RocksDb => {
let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap();
Arc::new(ObjectRepository::new(
conf,
Arc::new(obj_store),
Arc::new(walredo_mgr),
tenantid,
))
}
};
info!("initialized storage for tenant: {}", &tenantid);
m.insert(tenantid, Arc::new(repo));
m.insert(tenantid, repo);
}
}
@@ -53,7 +73,7 @@ pub fn create_repository_for_tenant(
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
m.insert(tenantid, Arc::new(repo));
m.insert(tenantid, repo);
Ok(())
}

View File

@@ -693,6 +693,18 @@ impl postgres_backend::Handler for PageServerHandler {
RowDescriptor::int8_col(b"control_deleted"),
RowDescriptor::int8_col(b"filenodemap_deleted"),
RowDescriptor::int8_col(b"dropped"),
RowDescriptor::int8_col(b"snapshot_relfiles_total"),
RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_branches"),
RowDescriptor::int8_col(b"snapshot_relfiles_not_updated"),
RowDescriptor::int8_col(b"snapshot_relfiles_removed"),
RowDescriptor::int8_col(b"snapshot_relfiles_dropped"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_total"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_needed_by_branches"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_not_updated"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_removed"),
RowDescriptor::int8_col(b"snapshot_nonrelfiles_dropped"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
@@ -705,6 +717,43 @@ impl postgres_backend::Handler for PageServerHandler {
Some(&result.control_deleted.to_string().as_bytes()),
Some(&result.filenodemap_deleted.to_string().as_bytes()),
Some(&result.dropped.to_string().as_bytes()),
Some(&result.snapshot_relfiles_total.to_string().as_bytes()),
Some(
&result
.snapshot_relfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
&result
.snapshot_relfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(&result.snapshot_relfiles_not_updated.to_string().as_bytes()),
Some(&result.snapshot_relfiles_removed.to_string().as_bytes()),
Some(&result.snapshot_relfiles_dropped.to_string().as_bytes()),
Some(&result.snapshot_nonrelfiles_total.to_string().as_bytes()),
Some(
&result
.snapshot_nonrelfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
&result
.snapshot_nonrelfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(
&result
.snapshot_nonrelfiles_not_updated
.to_string()
.as_bytes(),
),
Some(&result.snapshot_nonrelfiles_removed.to_string().as_bytes()),
Some(&result.snapshot_nonrelfiles_dropped.to_string().as_bytes()),
Some(&result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -120,7 +120,16 @@ impl RelishTag {
// and these don't
| RelishTag::ControlFile
| RelishTag::Checkpoint => false,
| RelishTag::Checkpoint => false,
}
}
// convenience function to check if this relish is a normal relation.
pub const fn is_relation(&self) -> bool {
if let RelishTag::Relation(_) = self {
true
} else {
false
}
}
}

View File

@@ -5,6 +5,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::iter::Iterator;
use std::ops::AddAssign;
use std::sync::Arc;
use std::time::Duration;
use zenith_utils::lsn::Lsn;
@@ -56,6 +57,8 @@ pub trait Repository: Send + Sync {
///
#[derive(Default)]
pub struct GcResult {
// FIXME: These counters make sense for the ObjectRepository. They are not used
// by the LayeredRepository.
pub n_relations: u64,
pub inspected: u64,
pub truncated: u64,
@@ -66,9 +69,51 @@ pub struct GcResult {
pub control_deleted: u64, // RelishTag::ControlFile
pub filenodemap_deleted: u64, // RelishTag::FileNodeMap
pub dropped: u64,
// These are used for the LayeredRepository instead
pub snapshot_relfiles_total: u64,
pub snapshot_relfiles_needed_by_cutoff: u64,
pub snapshot_relfiles_needed_by_branches: u64,
pub snapshot_relfiles_not_updated: u64,
pub snapshot_relfiles_removed: u64, // # of snapshot files removed because they have been made obsolete by newer snapshot files.
pub snapshot_relfiles_dropped: u64, // # of snapshot files removed because the relation was dropped
pub snapshot_nonrelfiles_total: u64,
pub snapshot_nonrelfiles_needed_by_cutoff: u64,
pub snapshot_nonrelfiles_needed_by_branches: u64,
pub snapshot_nonrelfiles_not_updated: u64,
pub snapshot_nonrelfiles_removed: u64, // # of snapshot files removed because they have been made obsolete by newer snapshot files.
pub snapshot_nonrelfiles_dropped: u64, // # of snapshot files removed because the relation was dropped
pub elapsed: Duration,
}
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.n_relations += other.n_relations;
self.truncated += other.truncated;
self.deleted += other.deleted;
self.dropped += other.dropped;
self.snapshot_relfiles_total += other.snapshot_relfiles_total;
self.snapshot_relfiles_needed_by_cutoff += other.snapshot_relfiles_needed_by_cutoff;
self.snapshot_relfiles_needed_by_branches += other.snapshot_relfiles_needed_by_branches;
self.snapshot_relfiles_not_updated += other.snapshot_relfiles_not_updated;
self.snapshot_relfiles_removed += other.snapshot_relfiles_removed;
self.snapshot_relfiles_dropped += other.snapshot_relfiles_dropped;
self.snapshot_nonrelfiles_total += other.snapshot_nonrelfiles_total;
self.snapshot_nonrelfiles_needed_by_cutoff += other.snapshot_nonrelfiles_needed_by_cutoff;
self.snapshot_nonrelfiles_needed_by_branches +=
other.snapshot_nonrelfiles_needed_by_branches;
self.snapshot_nonrelfiles_not_updated += other.snapshot_nonrelfiles_not_updated;
self.snapshot_nonrelfiles_removed += other.snapshot_nonrelfiles_removed;
self.snapshot_nonrelfiles_dropped += other.snapshot_nonrelfiles_dropped;
self.elapsed += other.elapsed;
}
}
pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------
// Public GET functions
@@ -234,11 +279,12 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::LayeredRepository;
use crate::object_repository::ObjectRepository;
use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry};
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use crate::{PageServerConf, RepositoryFormat};
use postgres_ffi::pg_constants;
use std::fs;
use std::path::PathBuf;
@@ -272,10 +318,16 @@ mod tests {
buf.freeze()
}
fn get_test_repo(test_name: &str) -> Result<Box<dyn Repository>> {
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
fn get_test_repo(
test_name: &str,
repository_format: RepositoryFormat,
) -> Result<Box<dyn Repository>> {
let repo_dir = PathBuf::from(format!("../tmp_check/test_{}", test_name));
let _ = fs::remove_dir_all(&repo_dir);
fs::create_dir_all(&repo_dir).unwrap();
fs::create_dir_all(&repo_dir)?;
fs::create_dir_all(&repo_dir.join("timelines"))?;
let conf = PageServerConf {
daemonize: false,
@@ -288,6 +340,7 @@ mod tests {
pg_distrib_dir: "".into(),
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
repository_format,
};
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.
@@ -295,24 +348,47 @@ mod tests {
let tenantid = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenantid)).unwrap();
let obj_store = RocksObjectStore::create(conf, &tenantid)?;
let walredo_mgr = TestRedoManager {};
let repo =
ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid);
let repo: Box<dyn Repository + Sync + Send> = match conf.repository_format {
RepositoryFormat::Layered => Box::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenantid,
)),
RepositoryFormat::RocksDb => {
let obj_store = RocksObjectStore::create(conf, &tenantid)?;
Ok(Box::new(repo))
Box::new(ObjectRepository::new(
conf,
Arc::new(obj_store),
Arc::new(walredo_mgr),
tenantid,
))
}
};
Ok(repo)
}
/// Test get_relsize() and truncation.
#[test]
fn test_relsize() -> Result<()> {
fn test_relsize_rocksdb() -> Result<()> {
let repo = get_test_repo("test_relsize_rocksdb", RepositoryFormat::RocksDb)?;
test_relsize(&*repo)
}
#[test]
fn test_relsize_layered() -> Result<()> {
let repo = get_test_repo("test_relsize_layered", RepositoryFormat::Layered)?;
test_relsize(&*repo)
}
fn test_relsize(repo: &dyn Repository) -> Result<()> {
// get_timeline() with non-existent timeline id should fail
//repo.get_timeline("11223344556677881122334455667788");
// Create timeline to work on
let repo = get_test_repo("test_relsize")?;
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
@@ -397,14 +473,24 @@ mod tests {
/// This isn't very interesting with the RocksDb implementation, as we don't pay
/// any attention to Postgres segment boundaries there.
#[test]
fn test_large_rel() -> Result<()> {
let repo = get_test_repo("test_large_rel")?;
fn test_large_rel_rocksdb() -> Result<()> {
let repo = get_test_repo("test_large_rel_rocksdb", RepositoryFormat::RocksDb)?;
test_large_rel(&*repo)
}
#[test]
fn test_large_rel_layered() -> Result<()> {
let repo = get_test_repo("test_large_rel_layered", RepositoryFormat::Layered)?;
test_large_rel(&*repo)
}
fn test_large_rel(repo: &dyn Repository) -> Result<()> {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
tline.init_valid_lsn(Lsn(1));
let mut lsn = 0;
let mut lsn = 1;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
lsn += 1;
@@ -450,15 +536,29 @@ mod tests {
}))
}
#[test]
fn test_branch_rocksdb() -> Result<()> {
let repo = get_test_repo("test_branch_rocksdb", RepositoryFormat::RocksDb)?;
test_branch(&*repo)
}
#[test]
fn test_branch_layered() -> Result<()> {
let repo = get_test_repo("test_branch_layered", RepositoryFormat::Layered)?;
test_branch(&*repo)
}
///
/// Test branch creation
///
#[test]
fn test_branch() -> Result<()> {
let repo = get_test_repo("test_branch")?;
fn test_branch(repo: &dyn Repository) -> Result<()> {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
// Import initial dummy checkpoint record, otherwise the get_timeline() call
// after branching fails below
tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(1), ZERO_PAGE.clone(), false)?;
// Create a relation on the timeline
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?;
@@ -500,8 +600,19 @@ mod tests {
}
#[test]
fn test_history() -> Result<()> {
let repo = get_test_repo("test_snapshot")?;
fn test_history_rocksdb() -> Result<()> {
let repo = get_test_repo("test_history_rocksdb", RepositoryFormat::RocksDb)?;
test_history(&*repo)
}
#[test]
// TODO: This doesn't work with the layered storage, the functions needed for push/pull
// functionality haven't been implemented yet.
#[ignore]
fn test_history_layered() -> Result<()> {
let repo = get_test_repo("test_history_layered", RepositoryFormat::Layered)?;
test_history(&*repo)
}
fn test_history(repo: &dyn Repository) -> Result<()> {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;

View File

@@ -132,6 +132,7 @@ pub fn import_timeline_from_postgres_datadir(
}
// TODO: Scan pg_tblspc
timeline.advance_last_valid_lsn(lsn);
timeline.checkpoint()?;
Ok(())
@@ -425,12 +426,12 @@ pub fn save_decoded_record(
save_xact_record(timeline, lsn, &parsed_xact, decoded)?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
info!(
"unlink twophaseFile for xid {} parsed_xact.xid {} here",
decoded.xl_xid, parsed_xact.xid
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid, parsed_xact.xid, lsn
);
timeline.put_unlink(
RelishTag::TwoPhase {
xid: decoded.xl_xid,
xid: parsed_xact.xid,
},
lsn,
)?;
@@ -795,7 +796,13 @@ fn save_clog_truncate_record(
// Iterate via SLRU CLOG segments and unlink segments that we're ready to truncate
// TODO This implementation is very inefficient -
// it scans all non-rels only to find Clog
for obj in timeline.list_nonrels(lsn)? {
//
// We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
// will block waiting for the last valid LSN to advance up to
// it. So we use the previous record's LSN in the get calls
// instead.
let req_lsn = min(timeline.get_last_record_lsn(), lsn);
for obj in timeline.list_nonrels(req_lsn)? {
match obj {
RelishTag::Slru { slru, segno } => {
if slru == SlruKind::Clog {

View File

@@ -8,7 +8,7 @@ use crate::page_cache;
use crate::relish::*;
use crate::restore_local_repo;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::{PageServerConf, RepositoryFormat};
use anyhow::{Error, Result};
use lazy_static::lazy_static;
use log::*;
@@ -264,7 +264,11 @@ fn walreceiver_main(
)?;
if newest_segno - oldest_segno >= 10 {
timeline.checkpoint()?;
// FIXME: The layered repository performs checkpointing in a separate thread, so this
// isn't needed anymore. Remove 'checkpoint' from the Timeline trait altogether?
if conf.repository_format == RepositoryFormat::RocksDb {
timeline.checkpoint()?;
}
// TODO: This is where we could remove WAL older than last_rec_lsn.
//remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?;

View File

@@ -14,7 +14,8 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
@pytest.mark.skip(reason=""""
Current GC test is flaky and overly strict. Since we are migrating to the layered repo format
with different GC implementation let's just silence this test for now.
with different GC implementation let's just silence this test for now. This test only
works with the RocksDB implementation.
""")
def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_gc", "empty"])

View File

@@ -0,0 +1,122 @@
from contextlib import closing
import psycopg2.extras
import time;
pytest_plugins = ("fixtures.zenith_fixtures")
def print_gc_result(row):
print("GC duration {elapsed} ms".format_map(row));
print(" REL total: {snapshot_relfiles_total}, needed_by_cutoff {snapshot_relfiles_needed_by_cutoff}, needed_by_branches: {snapshot_relfiles_needed_by_branches}, not_updated: {snapshot_relfiles_not_updated}, removed: {snapshot_relfiles_removed}, dropped: {snapshot_relfiles_dropped}".format_map(row))
print(" NONREL total: {snapshot_nonrelfiles_total}, needed_by_cutoff {snapshot_nonrelfiles_needed_by_cutoff}, needed_by_branches: {snapshot_nonrelfiles_needed_by_branches}, not_updated: {snapshot_nonrelfiles_not_updated}, removed: {snapshot_nonrelfiles_removed}, dropped: {snapshot_nonrelfiles_dropped}".format_map(row))
#
# Test Garbage Collection of old snapshot files
#
# This test is pretty tightly coupled with the current implementation of layered
# storage, in layered_repository.rs.
#
def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_snapfiles_gc", "empty"])
pg = postgres.create_start('test_snapfiles_gc')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory = psycopg2.extras.DictCursor) as pscur:
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
# Create a test table
cur.execute("CREATE TABLE foo(x integer)")
print("Inserting two more rows and running GC")
cur.execute("select relfilenode from pg_class where oid = 'foo'::regclass");
row = cur.fetchone();
print("relfilenode is {}", row[0]);
# Run GC, to clear out any garbage left behind in the catalogs by
# the CREATE TABLE command. We want to have a clean slate with no garbage
# before running the actual tests below, otherwise the counts won't match
# what we expect.
#
# Also run vacuum first to make it less likely that autovacuum or pruning
# kicks in and confuses our numbers.
cur.execute("VACUUM")
print("Running GC before test")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
# remember the number of files
snapshot_relfiles_remain = row['snapshot_relfiles_total'] - row['snapshot_relfiles_removed']
assert snapshot_relfiles_remain > 0
# Insert a row. The first insert will also create a metadata entry for the
# relation, with size == 1 block. Hence, bump up the expected relation count.
snapshot_relfiles_remain += 1;
print("Inserting one row and running GC")
cur.execute("INSERT INTO foo VALUES (1)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain
assert row['snapshot_relfiles_removed'] == 0
assert row['snapshot_relfiles_dropped'] == 0
# Insert two more rows and run GC.
# This should create a new snapshot file with the new contents, and
# remove the old one.
print("Inserting two more rows and running GC")
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + 1
assert row['snapshot_relfiles_removed'] == 1
assert row['snapshot_relfiles_dropped'] == 0
# Do it again. Should again create a new snapshot file and remove old one.
print("Inserting two more rows and running GC")
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + 1
assert row['snapshot_relfiles_removed'] == 1
assert row['snapshot_relfiles_dropped'] == 0
# Run GC again, with no changes in the database. Should not remove anything.
print("Run GC again, with nothing to do")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain
assert row['snapshot_relfiles_removed'] == 0
assert row['snapshot_relfiles_dropped'] == 0
#
# Test DROP TABLE checks that relation data and metadata was deleted by GC from object storage
#
print("Drop table and run GC again");
cur.execute("DROP TABLE foo")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
# Each relation fork is counted separately, hence 3.
assert row['snapshot_relfiles_dropped'] == 3
# The catalog updates also create new snapshot files of the catalogs, which
# are counted as 'removed'
assert row['snapshot_relfiles_removed'] > 0
# TODO: perhaps we should count catalog and user relations separately,
# to make this kind of testing more robust

View File

@@ -61,6 +61,13 @@ fn main() -> Result<()> {
.long("enable-auth")
.takes_value(false)
.help("Enable authentication using ZenithJWT")
)
.arg(
Arg::with_name("repository-format")
.long("repository-format")
.takes_value(false)
.value_name("repository-format")
.help("Choose repository format, 'layered' or 'rocksdb'")
),
)
.subcommand(
@@ -131,8 +138,8 @@ fn main() -> Result<()> {
} else {
AuthType::Trust
};
local_env::init(pageserver_uri, tenantid, auth_type)
let repository_format = init_match.value_of("repository-format");
local_env::init(pageserver_uri, tenantid, auth_type, repository_format)
.with_context(|| "Failed to create config file")?;
}
@@ -151,6 +158,7 @@ fn main() -> Result<()> {
if let Err(e) = pageserver.init(
Some(&env.tenantid.to_string()),
init_match.is_present("enable-auth"),
init_match.value_of("repository-format"),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);

View File

@@ -126,7 +126,7 @@ macro_rules! zid_newtype {
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);