Compare commits

...

62 Commits

Author SHA1 Message Date
Bojan Serafimov
20923e70f5 Merge branch 'main' into bojan-psbench-over-kvstore 2022-04-12 13:04:59 -04:00
Bojan Serafimov
98149f1a08 Parse output 2022-04-12 10:53:14 -04:00
Bojan Serafimov
fbc4206f2d Simplify 2022-04-11 21:50:50 -04:00
Bojan Serafimov
d614291c44 Test latest pages 2022-04-05 20:33:57 -04:00
Bojan Serafimov
35d8167f68 Measure that materializing all versions bloats by 0.3 2022-03-28 15:42:16 -04:00
Bojan Serafimov
2f7d9d2dd5 Count modified bits per wal (WIP) 2022-03-27 18:41:14 -04:00
Bojan Serafimov
21f9774ea4 Merge branch 'heikki-kvstore' into bojan-psbench-over-kvstore 2022-03-18 16:27:18 -04:00
Bojan Serafimov
098d7046f8 Improve test 2022-03-18 14:37:01 -04:00
Konstantin Knizhnik
a39de2997f Optimize reading versions for delta_layer
Store blob size in layer metadata for all layers types

Heikki: This is a squashed version of PR #1369
2022-03-18 14:35:13 +02:00
Heikki Linnakangas
d756921220 RFC fixes, per comments in the PR 2022-03-18 14:18:25 +02:00
Heikki Linnakangas
2bc9ed164f Merge remote-tracking branch 'origin/main' into heikki-kvstore 2022-03-18 12:08:17 +02:00
Heikki Linnakangas
8c4d270cde Fix InMemoryLayer::dump 2022-03-18 11:57:19 +02:00
Heikki Linnakangas
35584f7242 Bump magic IDs, to distinguish old file format from new 2022-03-18 11:57:19 +02:00
Heikki Linnakangas
12141523f6 Improve comments 2022-03-18 11:57:19 +02:00
Heikki Linnakangas
d383ed4e68 Add missing fsyncs 2022-03-18 11:57:19 +02:00
Heikki Linnakangas
13ec0ce7b2 fix formatting 2022-03-17 19:40:08 +02:00
Heikki Linnakangas
80fc133833 Add sequential scan tests 2022-03-17 17:12:30 +02:00
Heikki Linnakangas
3da14d56f2 Fix materialized page caching. 2022-03-17 17:12:30 +02:00
Heikki Linnakangas
b0b2093d00 Improve comments and tidy up the code in pgdatadir_mapping.rs. 2022-03-17 13:14:33 +02:00
Bojan Serafimov
02aa7c023a Add todos 2022-03-16 22:51:39 -04:00
Bojan Serafimov
180631da1f Remove todos 2022-03-16 22:40:36 -04:00
Bojan Serafimov
811d46f070 Fix 2022-03-16 22:39:55 -04:00
Bojan Serafimov
728f299641 Add hot page workload 2022-03-16 19:18:28 -04:00
Bojan Serafimov
fb49418e7f Trim dead code 2022-03-16 19:05:40 -04:00
Bojan Serafimov
887dc8f112 Print some stats 2022-03-16 19:01:54 -04:00
Bojan Serafimov
aa7b32d892 Simplify 2022-03-16 18:42:58 -04:00
Bojan Serafimov
d7ed9d8e01 cleanup 2022-03-16 17:40:19 -04:00
Bojan Serafimov
96c2b3a80a WIP working pageserver get_page client 2022-03-16 14:31:42 -04:00
Heikki Linnakangas
7560854370 Rename things in KeyPartition, per Bojan's suggestions. 2022-03-16 19:29:07 +02:00
Heikki Linnakangas
adbbb0a4c8 Merge remote-tracking branch 'origin/main' into heikki-kvstore 2022-03-16 19:08:45 +02:00
Heikki Linnakangas
6a264aaca3 Stopgap "fix" for test_parallel_copy failure in debug mode. 2022-03-14 19:54:38 +02:00
Heikki Linnakangas
60ed6b3710 Shave some CPU cycles from reading blobs from files.
This shows up in 'perf' profile when running in debug mode. Not so
significant in release mode, but still.
2022-03-14 19:53:00 +02:00
Heikki Linnakangas
89690d7349 Prevent compaction from running at same time as GC.
For same reasons as we prohibited concurrent checkpointing and GC
previosly.
2022-03-14 14:22:04 +02:00
Heikki Linnakangas
09f2dff537 Refactor the checkpoint and compaction functions.
The concept of a "checkpoint" had become quite muddled. This tries to
clarify it again.
2022-03-14 13:22:46 +02:00
Heikki Linnakangas
2d8587f67d Separate flushing in-memory layer to disk from checkpoints.
When 'checkpoint_distance' is reached, freeze the current in-memory
layer directly in the WAL receiver thread. And to flush the frozen
layer to disk, launch a separate "layer flushing thread". This leaves
only the compaction duty to the checkpoint thread.
2022-03-14 11:37:22 +02:00
Heikki Linnakangas
c559c72ede Merge remote-tracking branch 'origin/main' into HEAD 2022-03-14 10:26:05 +02:00
Heikki Linnakangas
f06707badc Bugfix: a few constant keys were missing from collect_keyspace
As a result, you got "could not find data for key" errors.
2022-03-13 01:15:32 +02:00
Heikki Linnakangas
64cdd6064d Don't ClearVisibilityMapFlags records for non-existent blocks.
We create a ClearVisibilityMapFlags record for the VM page, when a heap
WAL record indicates that the VM bit needs to be cleared. However,
sometimes the VM block would not exist. It seems that PostgreSQL
sometimes sets the clear-VM bit on WAL records, even though the
corresponding VM page hasn't been initialized yet. There's no point in
trying to clear a bit on a non-existent bit, so just skip emitting the
record if the VM page doesn't exist.

I'm not entirely sure why we're only seeing this bug with this PR, I
think it existed before. Maybe we were more sloppy and returned
an all-zeros page?
2022-03-13 01:14:58 +02:00
Heikki Linnakangas
4bd557ca61 Move RFC doc here.
From https://github.com/zenithdb/rfcs/pull/17
2022-03-12 11:35:28 +02:00
Heikki Linnakangas
ee40297758 Refactor keyspace code
Have separate classes for the KeySpace, a partitioning of the KeySpace
(KeyPartitioning), and a builder object used to construct the KeySpace.
Previously, KeyPartitioning did all those things, and it was a bit
confusing.
2022-03-11 16:24:13 +02:00
Heikki Linnakangas
d5b8380dae Improve comments on image layer.
Make it more explicit that if a key doesn't exist in an image layer, it
doesn't exist.
2022-03-11 09:47:09 +02:00
Heikki Linnakangas
bce2da4e55 Another 'tablespace' test fix. 2022-03-11 00:53:46 +02:00
Heikki Linnakangas
3948956e87 Fix pg_table_size() on a view 2022-03-10 23:35:24 +02:00
Heikki Linnakangas
a726b555fb Handle tablespaces gracefully.
We don't really support tablespaces. But this makes the 'tablespace'
Postgres regression test pass, like it did previously.
2022-03-10 22:56:37 +02:00
Heikki Linnakangas
0e3512aad0 Crank down logging again 2022-03-10 18:50:12 +02:00
Heikki Linnakangas
6fb566b46f Bump vendor/postgres to fix a bug with smgrnblocks() on newly created rel 2022-03-10 16:05:21 +02:00
Heikki Linnakangas
dd56eeefbf Crank up logging 2022-03-10 15:45:50 +02:00
Heikki Linnakangas
d19a293e7e Add a test for branching 2022-03-10 14:56:13 +02:00
Heikki Linnakangas
be4aebd7e9 silence clippy 2022-03-10 13:36:28 +02:00
Heikki Linnakangas
dac73328ba Fix bug where reldir was not written to image layer. 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
fb79c7f1f0 Make compaction more concurrent 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
e7bd74d558 Tidy up 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
da8beffc95 Fix logical timeline size tracking 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
98ec8418c4 Fix bug with the partitioning and GC 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
92d1322cd5 comments, other cleanup 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
2896d35a8b rustfmt and clippy fixes 2022-03-10 13:20:08 +02:00
Heikki Linnakangas
e096c62494 Misc fixes and stuff 2022-03-09 11:36:39 +02:00
Heikki Linnakangas
356f716d39 Fixes 2022-03-09 11:36:39 +02:00
Heikki Linnakangas
798ff26fb0 More work on compaction, and resurrect some unit tests 2022-03-09 11:36:39 +02:00
Heikki Linnakangas
28045890eb Work on compaction. 2022-03-09 11:36:39 +02:00
Heikki Linnakangas
6127b6638b Major storage format rewrite
Major changes and new concepts:

Simplify Repository to a value-store
------------------------------------

Move the responsibility of tracking relation metadata, like which relations
exist and what are their sizes, from Repository to a new module,
pgdatadir_mapping.rs. The interface to Repository is now a simple key-value
PUT/GET operations.

It's still not any old key-value store though. A Repository is still
responsible from handling branching, and every GET operation comes with
an LSN.

Key
---

The key to the Repository key-value store is a Key struct, which consists
of a few integer fields. It's wide enough to store a full RelFileNode,
fork and block number, and to distinguish those from metadata keys.

See pgdatadir_mapping.rs for how relation blocks and metadata keys are
mapped to the Key struct.

Store arbitrary key-ranges in the layer files
---------------------------------------------

The concept of a "segment" is gone. Each layer file can store an arbitrary
range of Keys.

TODO:

- Deleting keys, to reclaim space. This isn't visible to Postgres, dropping
  or truncating a relation works as you would expect if you look at it from
  the compute node. If you drop a relation, for example, the relation is
  removed from the metadata entry, so that it appears to be gone. However,
  the layered repository implementation never reclaims the storage.

- Tracking "logical database size", for disk space quotas. That ought to
  be reimplemented now in pgdatadir_mapping.rs, or perhaps in walingest.rs.

- LSM compaction. The logic for checkpointing and creating image layers is
  very dumb. AFAIK the *read* code could deal with a full-fledged LSM tree
  now consisting of the delta and image layers. But there's no code to
  take a bunch of delta layers and compact them, and the heuristics for
  when to create image layers is pretty dumb.

- The code to track the layers is inefficient. All layers are just stored in
  a vector, and whenever we need to find a layer, we do a linear search in
  it.
2022-03-09 11:36:39 +02:00
Heikki Linnakangas
c7c1e19667 Use more generics, less dyn 2022-03-09 11:36:38 +02:00
8 changed files with 304 additions and 0 deletions

View File

@@ -185,6 +185,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
// TODO init only if configured
pageserver::wal_metadata::init(conf).expect("wal_metadata init failed");
info!("version: {}", GIT_VERSION);
// TODO: Check that it looks like a valid repository before going further

View File

@@ -0,0 +1,158 @@
//! Pageserver benchmark tool
//!
//! Usually it's easier to write python perf tests, but here the performance
//! of the tester matters, and the API is easier to work with from rust.
use std::{collections::HashSet, io::{BufRead, BufReader, Cursor}, time::Duration};
use pageserver::wal_metadata::{Page, WalEntryMetadata};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::{BufMut, BytesMut};
use clap::{App, Arg};
use std::fs::File;
use zenith_utils::{GIT_VERSION, lsn::Lsn, pq_proto::{BeMessage, FeMessage}};
use std::time::Instant;
use anyhow::Result;
const BYTES_IN_PAGE: usize = 8 * 1024;
pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
BufReader::new(File::open(file_name).unwrap())
.lines()
.map(|result| result.unwrap())
}
pub async fn get_page(
pagestream: &mut tokio::net::TcpStream,
lsn: &Lsn,
page: &Page,
latest: bool,
) -> anyhow::Result<Vec<u8>> {
let latest: u8 = if latest {1} else {0};
let msg = {
let query = {
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(latest);
query.put_u64(lsn.0);
page.write(&mut query).await?;
query.freeze()
};
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&query);
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
pagestream.write(&msg).await?;
let response = match FeMessage::read_fut(pagestream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
let page = {
let mut cursor = Cursor::new(response);
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
match tag {
102 => {
let mut page = Vec::<u8>::new();
cursor.read_to_end(&mut page).await?;
if page.len() != BYTES_IN_PAGE {
panic!("Expected 8kb page, got: {:?}", page.len());
}
page
},
103 => {
let mut bytes = Vec::<u8>::new();
cursor.read_to_end(&mut bytes).await?;
let message = String::from_utf8(bytes)?;
panic!("Got error message: {}", message);
},
_ => panic!("Unhandled tag {:?}", tag)
}
};
Ok(page)
}
#[tokio::main]
async fn main() -> Result<()> {
let arg_matches = App::new("LALALA")
.about("lalala")
.version(GIT_VERSION)
.arg(
Arg::new("wal_metadata_file")
.help("Path to wal metadata file")
.required(true)
.index(1),
)
.arg(
Arg::new("tenant_hex")
.help("TODO")
.required(true)
.index(2),
)
.arg(
Arg::new("timeline")
.help("TODO")
.required(true)
.index(3),
)
.get_matches();
let metadata_file = arg_matches.value_of("wal_metadata_file").unwrap();
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
let timeline = arg_matches.value_of("timeline").unwrap();
// Parse log lines
let wal_metadata: Vec<WalEntryMetadata> = read_lines_buffered(metadata_file)
.map(|line| serde_json::from_str(&line).expect("corrupt metadata file"))
.collect();
// Get raw TCP connection to the pageserver postgres protocol port
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut socket, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant_hex, timeline);
tokio::select! {
_ = conn => panic!("AAAA"),
_ = client.query(init_query.as_str(), &[]) => (),
};
// Derive some variables
let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum();
let affected_pages: HashSet<_> = wal_metadata.iter().map(|m| m.affected_pages.clone())
.flatten().collect();
let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap();
// Get all latest pages
let mut durations: Vec<Duration> = vec![];
for page in &affected_pages {
let start = Instant::now();
let _page_bytes = get_page(&mut socket, &latest_lsn, &page, true).await?;
let duration = start.elapsed();
durations.push(duration);
}
durations.sort();
// Results are a space separated table of "metric_name value unit", for ease of parsing
println!("test_param num_pages {}", affected_pages.len());
println!("test_param num_wal_entries {}", wal_metadata.len());
println!("test_param total_wal_size {} bytes", total_wal_size);
println!("lower_is_better fastest {:?} microseconds", durations.first().unwrap().as_micros());
println!("lower_is_better median {:?} microseconds", durations[durations.len() / 2].as_micros());
println!("lower_is_better p99 {:?} microseconds", durations[durations.len() - 1 - durations.len() / 100].as_micros());
println!("lower_is_better slowest {:?} microseconds", durations.last().unwrap().as_micros());
Ok(())
}

View File

@@ -19,6 +19,7 @@ pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
pub mod wal_metadata;
use lazy_static::lazy_static;
use tracing::info;

View File

@@ -52,6 +52,7 @@ use zenith_utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::config::PageServerConf;
use crate::layered_repository::writeback_ephemeral_file;
use crate::repository::Key;

View File

@@ -0,0 +1,78 @@
use anyhow::Result;
use once_cell::sync::OnceCell;
use zenith_utils::lsn::Lsn;
use std::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use crate::{config::PageServerConf, repository::Key, walrecord::DecodedBkpBlock};
pub static WAL_METADATA_FILE: OnceCell<File> = OnceCell::new();
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
pub async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page { spcnode, dbnode, relnode, forknum, blkno })
}
pub async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
impl From<&DecodedBkpBlock> for Page {
fn from(blk: &DecodedBkpBlock) -> Self {
Page {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
blkno: blk.blkno,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WalEntryMetadata {
pub lsn: Lsn,
pub size: usize,
pub affected_pages: Vec<Page>,
}
pub fn init(conf: &'static PageServerConf) -> Result<()> {
let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log");
WAL_METADATA_FILE.set(File::create(wal_metadata_file_dir)?)
.expect("wal_metadata file is already created");
Ok(())
}
pub fn write(wal_meta: WalEntryMetadata) -> Result<()> {
if let Some(mut file) = WAL_METADATA_FILE.get() {
let mut line = serde_json::to_string(&wal_meta)?;
line.push('\n');
std::io::prelude::Write::write_all(&mut file, line.as_bytes())?;
}
Ok(())
}

View File

@@ -82,6 +82,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
let recdata_len = recdata.len();
let mut decoded = decode_wal_record(recdata);
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -249,6 +250,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
}
// Emit wal entry metadata, if configured to do so
crate::wal_metadata::write(crate::wal_metadata::WalEntryMetadata {
lsn,
size: recdata_len,
affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect()
});
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();

View File

@@ -638,6 +638,13 @@ class ZenithEnv:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
def run_psbench(self, timeline):
wal_metadata_filename = os.path.join(self.repo_dir, "wal_metadata.log")
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
tenant_hex = self.initial_tenant.hex
args = [psbench_binpath, wal_metadata_filename, tenant_hex, timeline]
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()

View File

@@ -0,0 +1,48 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnv, PgBin
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
def test_get_page(zenith_simple_env: ZenithEnv,
zenbenchmark: ZenithBenchmarker,
pg_bin: PgBin):
env = zenith_simple_env
env.zenith_cli.create_branch("test_pageserver", "empty")
pg = env.postgres.create_start('test_pageserver')
tenant_hex = env.initial_tenant.hex
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
# Long-lived cursor, useful for flushing
psconn = env.pageserver.connect()
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
workload = "pgbench"
print(f"Running workload {workload}")
if workload == "hot page":
cur.execute('create table t (i integer);')
cur.execute('insert into t values (0);')
for i in range(100000):
cur.execute(f'update t set i = {i};')
elif workload == "pgbench":
pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()])
elif workload == "pgbench big":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t100000', pg.connstr()])
elif workload == "pgbench long":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t1000000', pg.connstr()])
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
output = env.run_psbench(timeline)
for line in output.split("\n"):
tokens = line.split(" ")
report = tokens[0]
name = tokens[1]
value = tokens[2]
unit = tokens[3] if len(tokens) > 3 else ""
zenbenchmark.record(name, value, unit, report=report)