Compare commits

..

12 Commits

Author SHA1 Message Date
Dmitry Ivanov
15425325c4 Fix connection string 2022-10-03 17:27:11 +03:00
Dmitry Ivanov
f602e21aaf Improve logging 2022-09-30 10:34:52 +03:00
Dmitry Ivanov
a9c2124267 The DOT! 2022-09-30 10:31:40 +03:00
Dmitry Rodionov
4539f5597e fmt, fix arg node 2022-09-29 19:03:03 +03:00
Dmitry Ivanov
1bac60ba31 Make tar less verbose 2022-09-29 18:42:54 +03:00
Dmitry Ivanov
8832cc65fe Even more verbose prints 2022-09-29 18:41:07 +03:00
Dmitry Ivanov
cabbc33cbf Print more params 2022-09-29 18:32:17 +03:00
Dmitry Ivanov
13646d1f99 Print relid 2022-09-29 18:11:18 +03:00
Dmitry Ivanov
a0091038e3 Run psql 2022-09-29 18:02:53 +03:00
Dmitry Ivanov
3093d5ff94 One more fix 2022-09-29 17:46:55 +03:00
Dmitry Ivanov
9d903b6e8b Improvements 2022-09-29 17:40:36 +03:00
Dmitry Rodionov
3a3efe5a96 wip backup script 2022-09-29 15:25:41 +03:00
51 changed files with 307 additions and 1216 deletions

View File

@@ -127,8 +127,8 @@ jobs:
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
- name: Cache postgres v14 build
id: cache_pg_14
@@ -389,7 +389,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact
uses: ./.github/actions/download
@@ -768,5 +768,5 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -106,7 +106,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git
target
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
- name: Run cargo clippy
run: ./run_clippy.sh

105
Cargo.lock generated
View File

@@ -497,10 +497,8 @@ dependencies = [
"chrono",
"clap 3.2.16",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
"serde",
@@ -542,11 +540,11 @@ dependencies = [
"git-version",
"nix",
"once_cell",
"pageserver_api",
"pageserver",
"postgres",
"regex",
"reqwest",
"safekeeper_api",
"safekeeper",
"serde",
"serde_with",
"tar",
@@ -1074,15 +1072,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.21"
@@ -1504,26 +1493,6 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -1583,26 +1552,6 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "kstring"
version = "1.0.6"
@@ -1848,24 +1797,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2044,7 +1975,6 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"pageserver_api",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -2059,7 +1989,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -2074,17 +2003,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -2973,7 +2891,6 @@ dependencies = [
"postgres_ffi",
"regex",
"remote_storage",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
@@ -2989,17 +2906,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -3334,12 +3240,6 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"
@@ -4242,7 +4142,6 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"crossbeam-utils",
"either",
"fail",
"hashbrown",

View File

@@ -8,10 +8,8 @@ anyhow = "1.0"
chrono = "0.4"
clap = "3.0"
env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -258,7 +258,14 @@ impl ComputeNode {
.spawn()
.expect("cannot start postgres process");
wait_for_postgres(&mut pg, pgdata_path)?;
// Try default Postgres port if it is not provided
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
// If connection fails,
// it may be the old node with `zenith_admin` superuser.

View File

@@ -1,19 +1,18 @@
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::time::{Duration, Instant};
use std::str::FromStr;
use std::{fs, thread, time};
use anyhow::{bail, Result};
use postgres::{Client, Transaction};
use serde::Deserialize;
use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
@@ -231,85 +230,52 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs)
}
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
/// Wait for Postgres to become ready to accept connections:
/// - state should be `ready` in the `pgdata/postmaster.pid`
/// - and we should be able to connect to 127.0.0.1:5432
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
// completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})?;
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let timeout = time::Duration::from_millis(10);
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
// but postgres starts listening almost immediately, even if it is not really
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code);
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
log::debug!("last line of postmaster.pid: {status:?}");
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Now Postgres is ready to accept connections
if status == "ready" {
if status == "ready" && can_connect {
break;
}
}
}
// Give up after POSTGRES_WAIT_TIMEOUT.
let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
thread::sleep(pause);
slept += 100;
}
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -19,9 +19,7 @@ thiserror = "1"
nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
pageserver = { path = "../pageserver" }
safekeeper = { path = "../safekeeper" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use safekeeper_api::{
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};

View File

@@ -285,7 +285,7 @@ impl LocalEnv {
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
self.branch_name_mappings
dbg!(&self.branch_name_mappings)
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper::http::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver_api::models::{
use pageserver::http::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Response error: {}", err_body.msg),
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
@@ -181,15 +181,14 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("Failed to create tenant")?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
).context("Failed to create timeline")?;
)?;
Ok(initial_timeline_info.timeline_id)
}
@@ -420,11 +419,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
sudo apt install python3.9
```
- Install `poetry`
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
- Install dependencies via `./scripts/pysync`.
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
so if you have different version some linting tools can yield different result locally vs in the CI.

111
hack/demo.py Executable file
View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import textwrap
import uuid
from pathlib import Path
import testgres
def run_command(args):
print('> Cmd:', ' '.join(args))
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
ret = p.wait()
output = p.stdout.read().strip()
if output:
print(textwrap.indent(output, '>> '))
if ret != 0:
raise subprocess.CalledProcessError(ret, args)
def make_tarfile(output_filename, source_dir):
print("* Packing the backup into a tarball")
cmd = ["tar", r"--transform=s/\.\///", "-C", str(source_dir), "-cf", str(output_filename), "."]
run_command(cmd)
def create_tenant(tenant_id):
print("* Creating a new tenant")
cmd = ["neon_local", "tenant", "create", f"--tenant-id={tenant_id}"]
run_command(cmd)
def import_backup(args, backup_dir: Path):
tar = Path('/tmp/base.tar')
make_tarfile(tar, backup_dir / 'data')
print("* Importing the timeline into the pageserver")
manifest = json.loads((backup_dir / "data" / "backup_manifest").read_text())
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
print("> LSNs:", start_lsn, end_lsn)
cmd = (
"neon_local timeline import "
f"--tenant-id {args.tenant_id} "
f"--base-lsn {start_lsn} "
f"--end-lsn {end_lsn} "
f"--base-tarfile {tar} "
f"--timeline-id {args.timeline_id} "
f"--node-name {args.node}"
)
run_command(cmd.split())
def debug_prints(node):
tuples = node.execute("table foo")
oid = node.execute("select 'foo'::regclass::oid")[0][0]
print("> foo's tuples:", tuples, "&", "oid:", oid)
print("> DBs:", node.execute("select oid, datname from pg_database"))
def main(args):
print("* Creating a node")
node = testgres.get_new_node()
node.init(unix_sockets=False, allow_streaming=True).start()
node.execute("create table foo as select 1")
debug_prints(node)
# node.pgbench_init(scale=1)
print("* Creating a backup")
backup = node.backup()
backup_dir = Path(backup.base_dir)
print("> Backup dir:", backup_dir)
# pr = backup.spawn_primary().start()
# debug_prints(pr)
# exit(1)
create_tenant(args.tenant_id)
import_backup(args, backup_dir)
print("> Tenant:", args.tenant_id)
print("> Timeline:", args.timeline_id)
print("> Node:", args.node)
print("* Starting postgres")
cmd = ["neon_local", "pg", "start", f"--tenant-id={args.tenant_id}", f"--timeline-id={args.timeline_id}", args.node]
run_command(cmd)
print("* Opening psql session...")
cmd = ["psql", f"host=127.0.0.1 port=55432 user={os.getlogin()} dbname=postgres"]
subprocess.call(cmd)
if __name__ == "__main__":
tenant_id = uuid.uuid4().hex
parser = argparse.ArgumentParser()
parser.add_argument("--tenant-id", default=tenant_id)
parser.add_argument("--timeline-id", default=tenant_id)
parser.add_argument("node")
args = parser.parse_args(sys.argv[1:])
main(args)

View File

@@ -1,12 +0,0 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,9 +0,0 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -3,11 +3,9 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
// suppress warnings on rust 1.53 due to bindgen unit tests.
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;

View File

@@ -57,10 +57,12 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
@@ -69,6 +71,7 @@ pub fn XLogSegNoOffsetToRecPtr(
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
@@ -78,6 +81,7 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
@@ -85,10 +89,12 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}

View File

@@ -1,12 +0,0 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,10 +0,0 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -58,7 +58,6 @@ rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
@@ -67,7 +66,6 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -1,185 +0,0 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
str::FromStr,
};
use svg_fmt::*;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
.about("Makes a svg file that displays the read pattern")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.get_matches();
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("tenant: {tenant_id}");
println!("opening {path:?}");
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("hi");
// println!("timeline: {timeline_id}");
println!("opening {path:?}");
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
println!("opening {path:?}");
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
// println!("Parsed message {:?}", msg);
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
let delta = delta1;
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
let mut svg_file = File::create("out.svg").expect("can't make file");
writeln!(
&mut svg_file,
"{}",
BeginSvg {
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))
.border_radius(0.5)
)?;
// println!(" {}",
// rectangle(key_start as f32 + stretch * margin,
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
// key_diff as f32 - stretch * 2.0 * margin,
// stretch * (lsn_diff - 2.0 * margin))
// // .fill(rgb(200, 200, 200))
// .fill(fill)
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
// .border_radius(0.4)
// );
}
writeln!(&mut svg_file, "{}", EndSvg)?;
Ok(())
}

View File

@@ -1,133 +0,0 @@
use bytes::BytesMut;
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
};
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
// Connect
let mut stream = TcpStream::connect(tcp_addr).await?;
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let request = {
let msg_bytes = msg.serialize();
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&msg_bytes);
// TODO it's actually a fe message but it doesn't have a serializer yet
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&request).await?;
// TODO It's actually a be message, but it doesn't have a parser.
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
let _response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request(msg).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace replay tool")
.about("Replays wal or read traces to test pageserver performance")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
// let len = file.metadata().unwrap().len();
// println!("replaying {:?} trace {} bytes", path, len);
replay_trace(&mut reader, pagestream).await?;
}
}
}
Ok(())
}

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -364,23 +364,6 @@ impl PageServerConf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {

View File

@@ -1,4 +1,3 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -7,17 +7,7 @@ use utils::{
lsn::Lsn,
};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
use crate::tenant::TenantState;
#[serde_as]
#[derive(Serialize, Deserialize)]
@@ -52,7 +42,6 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]

View File

@@ -337,16 +337,9 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(tenant) => {
if tenant.list_timelines().is_empty() {
info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Ok(_) => Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
)),
Err(_) => Ok(()),
})
.await
@@ -617,9 +610,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {

View File

@@ -16,8 +16,6 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
// pub mod timelines;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -10,9 +10,7 @@
//
use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
@@ -44,7 +42,6 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -52,9 +49,7 @@ use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
@@ -62,7 +57,7 @@ pub enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
@@ -71,152 +66,106 @@ pub enum PagestreamBeMessage {
}
#[derive(Debug)]
pub struct PagestreamExistsRequest {
struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
struct PagestreamGetPageRequest {
latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
lsn: Lsn,
rel: RelTag,
blkno: u32,
}
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.read_u32::<BigEndian>()?,
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
@@ -473,17 +422,6 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -508,24 +446,15 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {

View File

@@ -639,7 +639,6 @@ pub fn spawn_storage_sync_task(
(storage, remote_index_clone, sync_queue),
max_sync_errors,
)
.instrument(info_span!("storage_sync_loop"))
.await;
Ok(())
},

View File

@@ -45,7 +45,6 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
@@ -119,6 +118,18 @@ pub struct Tenant {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
@@ -389,19 +400,16 @@ impl Tenant {
timeline_id,
metadata.pg_version()
);
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
),
Entry::Vacant(v) => {
let timeline = self
.initialize_new_timeline(timeline_id, metadata, ancestor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
v.insert(timeline);
}
}
@@ -593,14 +601,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
// TODO is there a need for all this boilerplate?
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -609,14 +609,21 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
) -> anyhow::Result<Arc<Timeline>> {
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
ancestor.is_some(),
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
}
let ancestor = match new_metadata.ancestor_timeline() {
Some(ancestor_timeline_id) => Some(
timelines
.get(&ancestor_timeline_id)
.cloned()
.with_context(|| {
format!(
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
})?,
),
None => None,
};
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
@@ -1073,12 +1080,8 @@ impl Tenant {
)
})?;
let ancestor = new_metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
.cloned();
let new_timeline = self
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
.with_context(|| {
format!(
"Failed to initialize timeline {}/{}",

View File

@@ -627,7 +627,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
spawn_connection_manager_task(
let _ = spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(),
self_clone,
walreceiver_connect_timeout,

View File

@@ -82,10 +82,6 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
/// If enabled, records all read requests for this ... TODO
/// Even though read traces are small, there's no delete meachanism so they can
/// pile up. So we disable this by default.
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -109,7 +105,6 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -143,9 +138,6 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -215,7 +207,6 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -241,7 +232,6 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

View File

@@ -107,13 +107,6 @@ pub fn init_tenant_mgr(
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
@@ -129,20 +122,18 @@ pub fn attach_local_tenants(
);
debug!("Timelines to attach: {local_timelines:?}");
let mut tenants_accessor = tenants_state::write_tenants();
let tenant = match tenants_accessor.entry(tenant_id) {
hash_map::Entry::Occupied(o) => {
info!("Tenant {tenant_id} was found in pageserver's memory");
Arc::clone(o.get())
let tenant = load_local_tenant(conf, tenant_id, remote_index);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&tenant));
}
}
hash_map::Entry::Vacant(v) => {
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
v.insert(Arc::clone(&tenant));
tenant
}
};
drop(tenants_accessor);
}
if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}")
@@ -177,28 +168,16 @@ fn load_local_tenant(
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
if !tenant_timelines_dir.is_dir() {
error!(
"Tenant {} has no timelines directory at {}",
tenant_id,
tenant_timelines_dir.display()
);
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
tenant
}
@@ -646,10 +625,14 @@ fn collect_timelines_for_tenant(
}
if tenant_timelines.is_empty() {
// this is normal, we've removed all broken, empty and temporary timeline dirs
// but should allow the tenant to stay functional and allow creating new timelines
// on a restart, we require tenants to have the timelines dir, so leave it on disk
debug!("Tenant {tenant_id} has no timelines loaded");
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
}
Ok((tenant_id, tenant_timelines))

View File

@@ -1,36 +0,0 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -264,76 +264,12 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
char* response;
int len;
} NeonRequestResponse;
#define MAX_PAGE_CACHE_SIZE 50
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = nm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
for (int i = 0; i < page_cache_size; i++) {
if (page_cache[i].request_hash == hash) {
int len = page_cache[0].response->len;
NeonResponse *resp = palloc0(len);
// I'd rather Rc than memcpy, but this is not rust :(
memcpy(resp, page_cache[0].response->data, len);
elog(LOG, "cache hit !!!");
return resp;
}
}
// Send request, get response
pageserver_send(request);
pageserver_flush();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
page_cache[page_cache_head].request_hash = hash;
if (page_cache_head < page_cache_size) {
pfree(page_cache[page_cache_head].response);
}
page_cache[page_cache_head].len = len;
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
memcpy(page_cache[page_cache_head].response, resp, len);
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
page_cache_size += 1;
}
return resp;
return pageserver_receive();
}
page_server_api api = {

View File

@@ -129,13 +129,13 @@ mod tests {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
futures::future::pending::<()>().await; // sleep forever
let () = futures::future::pending().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
rx.await.context("failed to hear from the task")?;
let () = rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();

View File

@@ -5,7 +5,6 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts =
-m 'not remote_cluster'
--ignore=test_runner/performance
markers =
remote_cluster
testpaths =

View File

@@ -1,10 +1,11 @@
[toolchain]
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
# version, we can consider updating.
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
# we use "unstable" version number as the highest version used in the project by default.
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
# 'testing' soon (and similarly for the other distributions).
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -33,7 +33,6 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -1,4 +1,3 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,13 +27,14 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;
pub use safekeeper_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}

View File

@@ -56,14 +56,6 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -91,25 +91,10 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# HACK enable request tracing, as an experiment
# NOTE This must be done before the pg starts pagestream
# TODO why does it not work?
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
# "trace_read_requests": "true",
# })
# We only use one branch and one timeline
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
self.env.neon_cli.create_branch(branch_name, "empty")
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
@property
def pg(self):
@@ -124,10 +109,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
def compact(self):
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
@@ -139,7 +124,7 @@ class NeonCompare(PgCompare):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.tenant, self.timeline
self.env.repo_dir, self.env.initial_tenant, self.timeline
)
self.zenbenchmark.record(
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -455,9 +455,6 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -586,9 +583,7 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1749,37 +1744,6 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
class ReplayBin:
"""A helper class for replaying pageserver wal and read traces."""
traces_dir: str
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
def draw_all(self):
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
args = [
draw_binpath,
self.traces_dir,
]
subprocess.run(args)
@pytest.fixture(scope="function")
def replay_bin(test_output_dir):
traces_dir = os.path.join(test_output_dir, "repo", "traces")
return ReplayBin(traces_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")

View File

@@ -175,11 +175,6 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#

View File

@@ -1,57 +0,0 @@
from contextlib import closing
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
# This test is a demonstration of how to do trace playback. With the current
# workload it uses it's not testing anything meaningful.
def test_trace_replay(
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "true",
# })
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with zenbenchmark.record_duration("run"):
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "false",
# })
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
# assert trace_path.exists()
replay_bin.draw_all()
return
print("replaying")
ps_connstr = env.pageserver.connstr()
with zenbenchmark.record_duration("replay"):
output = replay_bin.replay_all(ps_connstr)
print(output)

View File

@@ -1,7 +1,6 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder

View File

@@ -1,5 +1,4 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -8,13 +7,8 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from prometheus_client.samples import Sample
@@ -207,63 +201,3 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,25 +7,19 @@
#
import asyncio
import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
wait_until,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -99,93 +93,3 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
# run final checkpoint manually to flush all the data to remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -19,7 +19,6 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }