Compare commits

...

12 Commits

Author SHA1 Message Date
Christian Schwarz
2d90b95b6a implement a benchmark for tokio tcp handling to figure out perf bottleneck in no_op libpq benchmark 2023-11-08 16:34:57 +00:00
Christian Schwarz
06b57361e4 implement a standalone no-op server
usable by getpage_bench_libpq by running it on a different
port than the pageserver libpq listener, and overriding
connstring for getpage_bench_libpq to point to the
noop_server
2023-11-08 16:34:57 +00:00
Christian Schwarz
36be29d0b8 getpage_bench_libpq: support for the no-op mode 2023-11-08 16:34:56 +00:00
Christian Schwarz
6a202cdf08 no-op pagestream request/response type (server-side impl) 2023-11-08 16:34:56 +00:00
Christian Schwarz
f51e608193 pq bench: avoid repeated conversion to_i128 2023-11-08 16:34:56 +00:00
Christian Schwarz
78a28f787c per-second RPS 2023-11-08 16:34:56 +00:00
Christian Schwarz
001a0e4006 pq bench: proper shutdown 2023-11-08 16:34:56 +00:00
Christian Schwarz
daa2ea7ebe http bench: sligthly improved stats 2023-11-08 16:34:56 +00:00
Christian Schwarz
37e8eba57f WIP: libpq-based client
depends on https://github.com/neondatabase/rust-postgres/pull/25
2023-11-08 16:34:56 +00:00
Christian Schwarz
f45882ef3c rename getpage_bench to getpage_bench_http 2023-11-08 16:34:56 +00:00
Christian Schwarz
d16d02d61d WIP: benchmark that does random getpage requests against the keyspace
backup of pageserver.toml

    d =1
    pg_distrib_dir ='/home/admin/neon-main/pg_install'
    http_auth_type ='Trust'
    pg_auth_type ='Trust'
    listen_http_addr ='127.0.0.1:9898'
    listen_pg_addr ='127.0.0.1:64000'
    broker_endpoint ='http://127.0.0.1:50051/'
    #control_plane_api ='http://127.0.0.1:1234/'

    # Initial configuration file created by 'pageserver --init'
    #listen_pg_addr = '127.0.0.1:64000'
    #listen_http_addr = '127.0.0.1:9898'

    #wait_lsn_timeout = '60 s'
    #wal_redo_timeout = '60 s'

    #max_file_descriptors = 10000
    #page_cache_size = 160000

    # initial superuser role name to use when creating a new tenant
    #initial_superuser_name = 'cloud_admin'

    #broker_endpoint = 'http://127.0.0.1:50051'

    #log_format = 'plain'

    #concurrent_tenant_size_logical_size_queries = '1'

    #metric_collection_interval = '10 min'
    #cached_metric_collection_interval = '0s'
    #synthetic_size_calculation_interval = '10 min'

    #disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}

    #background_task_maximum_delay = '10s'

    [tenant_config]
    #checkpoint_distance = 268435456 # in bytes
    #checkpoint_timeout = 10 m
    #compaction_target_size = 134217728 # in bytes
    #compaction_period = '20 s'
    #compaction_threshold = 10

    #gc_period = '1 hr'
    #gc_horizon = 67108864
    #image_creation_threshold = 3
    #pitr_interval = '7 days'

    #min_resident_size_override = .. # in bytes
    #evictions_low_residence_duration_metric_threshold = '24 hour'
    #gc_feedback = false

    # make it determinsitic
    gc_period = '0s'
    checkpoint_timeout = '3650 day'
    compaction_period = '20 s'
    compaction_threshold = 10
    compaction_target_size = 134217728
    checkpoint_distance = 268435456
    image_creation_threshold = 3

    [remote_storage]
    local_path = '/home/admin/neon-main/bench_repo_dir/repo/remote_storage_local_fs'
2023-11-08 16:34:56 +00:00
Christian Schwarz
947f6d9491 API to duplicate a tenant 2023-11-08 16:34:56 +00:00
21 changed files with 1405 additions and 17 deletions

29
Cargo.lock generated
View File

@@ -2927,6 +2927,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -3193,6 +3203,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagectl"
version = "0.1.0"
@@ -3278,10 +3294,12 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"tracing",
"tracing-subscriber",
"url",
"utils",
"walkdir",
@@ -3556,7 +3574,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3569,7 +3587,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
dependencies = [
"native-tls",
"tokio",
@@ -3580,7 +3598,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3598,7 +3616,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
dependencies = [
"bytes",
"fallible-iterator",
@@ -5414,7 +5432,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
dependencies = [
"async-trait",
"byteorder",
@@ -5771,6 +5789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",

View File

@@ -163,11 +163,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -204,7 +204,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
################# Binary contents sections

View File

@@ -18,7 +18,7 @@ use utils::{
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
@@ -572,15 +572,18 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
NoOp,
}
// Wrapped in libpq CopyData
#[derive(Debug)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
NoOp,
}
#[derive(Debug, PartialEq, Eq)]
@@ -679,6 +682,10 @@ impl PagestreamFeMessage {
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::NoOp => {
bytes.put_u8(4);
}
}
bytes.into()
@@ -729,6 +736,7 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::NoOp),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -763,10 +771,46 @@ impl PagestreamBeMessage {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
Self::NoOp => {
bytes.put_u8(105);
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
match msg_tag {
100 => todo!(),
101 => todo!(),
102 => {
let buf = buf.get_ref();
/* TODO use constant */
if buf.len() == 8192 {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page: buf.clone(),
}))
} else {
anyhow::bail!("invalid page size: {}", buf.len());
}
}
103 => {
let buf = buf.get_ref();
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
let rust_str = cstr.to_str()?;
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
message: rust_str.to_owned(),
}))
}
104 => todo!(),
105 => {
Ok(PagestreamBeMessage::NoOp)
},
_ => bail!("unknown tag: {:?}", msg_tag),
}
}
}
#[cfg(test)]

View File

@@ -114,7 +114,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
Self(self.0.join(segment))
}

View File

@@ -82,6 +82,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tokio-stream.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[dev-dependencies]
criterion.workspace = true

View File

@@ -0,0 +1,245 @@
use clap::Parser;
use hyper::client::conn::Parts;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Uri};
use pageserver::{repository, tenant};
use rand::prelude::*;
use std::env::args;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle;
struct Key(repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
repository::Key::from_hex(s).map(Key)
}
}
struct KeyRange {
start: Key,
end: Key,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end.0.to_i128() - self.start.0.to_i128()
}
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, default_value = "http://localhost:9898")]
ps_endpoint: String,
// tenant_id: String,
// timeline_id: String,
num_tasks: usize,
num_requests: usize,
tenants: Option<Vec<String>>,
#[clap(long)]
pick_n_tenants: Option<usize>,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
let client = Client::new();
let tenants = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
// let tenant_id = "b97965931096047b2d54958756baee7b";
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.ps_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
Arc::clone(&stats),
));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
fn timeline(
args: &'static Args,
client: Client<HttpConnector, Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> {
async move {
let mut resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.ps_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn = Arc::new(keyspace["at_lsn"].as_str().unwrap().to_owned());
let ranges = keyspace["keys"]
.as_array()
.unwrap()
.iter()
.map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
KeyRange { start, end }
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let (tx, mut rx) = channel::<i32>(1000);
let tx = Arc::new(AsyncMutex::new(tx));
let mut tasks = Vec::<JoinHandle<()>>::new();
let start = std::time::Instant::now();
for i in 0..args.num_tasks {
let ranges = ranges.clone();
let weights = weights.clone();
let lsn = lsn.clone();
let client = client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let stats = Arc::clone(&stats);
let task = tokio::spawn(async move {
for i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
key
};
let url = format!(
"{}/v1/tenant/{}/timeline/{}/getpage?key={:036x}&lsn={}",
args.ps_endpoint, tenant_id, timeline_id, key, lsn
);
let uri = url.parse::<Uri>().unwrap();
let resp = client.get(uri).await.unwrap();
stats.inc();
}
});
tasks.push(task);
}
drop(tx);
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
(args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64()
);
}
}

View File

@@ -0,0 +1,411 @@
use anyhow::Context;
use clap::Parser;
use futures::{SinkExt, TryStreamExt};
use hyper::client::conn::Parts;
use hyper::client::HttpConnector;
use hyper::{Client, Uri};
use pageserver::page_cache::PAGE_SZ;
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::{repository, tenant};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
};
use pageserver_api::reltag::RelTag;
use rand::prelude::*;
use scopeguard::defer;
use std::env::args;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle;
use tokio_stream::{Stream, StreamExt};
use utils::completion;
use utils::lsn::Lsn;
struct Key(repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
repository::Key::from_hex(s).map(Key)
}
}
struct KeyRange {
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
struct RelTagBlockNo {
rel_tag: RelTag,
block_no: u32,
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, default_value = "http://localhost:9898")]
ps_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
pq_client_connstring: String,
// tenant_id: String,
// timeline_id: String,
num_tasks: usize,
num_requests: usize,
tenants: Option<Vec<String>>,
#[clap(long)]
pick_n_tenants: Option<usize>,
#[clap(subcommand)]
mode: Mode,
}
#[derive(clap::Parser, Clone)]
enum Mode {
GetPage,
NoOp,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
// std::env::set_var("RUST_LOG", "info,tokio_postgres=trace");
// tracing_subscriber::fmt::init();
let client = Client::new();
let tenants = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
// let tenant_id = "b97965931096047b2d54958756baee7b";
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.ps_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let stats = Arc::clone(&stats);
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
stats,
));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
fn timeline(
args: &'static Args,
http_client: Client<HttpConnector, hyper::Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> + Send + Sync {
async move {
let mut resp = http_client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.ps_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
let ranges = keyspace["keys"]
.as_array()
.unwrap()
.iter()
.filter_map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
// filter out non-relblock keys
match (is_rel_block_key(start.0), is_rel_block_key(end.0)) {
(true, true) => Some(KeyRange {
start: start.0.to_i128(),
end: end.0.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
let start = std::time::Instant::now();
for i in 0..args.num_tasks {
let ranges = ranges.clone();
let weights = weights.clone();
let client = http_client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut client = getpage_client::Client::new(
args.pq_client_connstring.clone(),
tenant_id.clone(),
timeline_id.clone(),
)
.await
.unwrap();
for i in 0..args.num_requests {
match args.mode {
Mode::GetPage => {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range((r.start..r.end));
let key = repository::Key::from_i128(key);
// XXX filter these out when we iterate the keyspace
assert!(
is_rel_block_key(key),
"we filter non-relblock keys out above"
);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client
.getpage(key, lsn)
.await
.with_context(|| {
format!(
"getpage for tenant {} timeline {}",
tenant_id, timeline_id
)
})
.unwrap();
}
Mode::NoOp => {
client.noop().await.unwrap();
}
}
stats.inc();
}
client.shutdown().await;
}
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap();
}
}
}
mod getpage_client {
use std::pin::Pin;
use futures::SinkExt;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use crate::RelTagBlockNo;
pub(crate) struct Client {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
impl Client {
pub fn new(
connstring: String,
tenant_id: String,
timeline_id: String,
) -> impl std::future::Future<Output = anyhow::Result<Self>> + Send {
async move {
let (client, connection) =
tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => {
return;
}
res = connection => {
res.unwrap();
}
}
}
});
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
Ok(Self {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
})
}
}
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
match self.do_request(req).await? {
PagestreamBeMessage::GetPage(p) => Ok(p),
x => anyhow::bail!("Unexpected response: {:?}", x),
}
}
pub async fn noop(&mut self) -> anyhow::Result<()> {
match self.do_request(PagestreamFeMessage::NoOp).await? {
PagestreamBeMessage::NoOp => Ok(()),
x => anyhow::bail!("Unexpected response: {:?}", x),
}
}
async fn do_request(
&mut self,
req: PagestreamFeMessage,
) -> Result<PagestreamBeMessage, anyhow::Error> {
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next = next.unwrap().unwrap();
match PagestreamBeMessage::deserialize(next)? {
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
x => Ok(x),
}
}
}
}

View File

@@ -0,0 +1,109 @@
use anyhow::Context;
use bytes::Buf;
use clap::Parser;
use pageserver_api::models::{PagestreamBeMessage, PagestreamErrorResponse, PagestreamFeMessage};
use postgres_backend::{AuthType, PostgresBackend, QueryError};
use pq_proto::{BeMessage, FeMessage};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
#[derive(clap::Parser)]
struct Args {
bind: String,
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let listener = tokio::net::TcpListener::bind(&args.bind).await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
handle_connection(socket).await.unwrap();
});
}
}
async fn handle_connection(socket: tokio::net::TcpStream) -> anyhow::Result<()> {
socket
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
let socket = tokio_io_timeout::TimeoutReader::new(socket);
tokio::pin!(socket);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, AuthType::Trust, None)?;
let mut conn_handler = NoOpHandler;
let cancel = CancellationToken::new();
pgbackend
.run(&mut conn_handler, || {
let cancel = cancel.clone();
async move { cancel.cancelled().await }
})
.await?;
anyhow::Ok(())
}
struct NoOpHandler;
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for NoOpHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn startup(
&mut self,
_pgb: &mut PostgresBackend<IO>,
_sm: &pq_proto::FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
}
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
if !query_string.starts_with("pagestream ") {
return Err(QueryError::Other(anyhow::anyhow!("not a pagestream query")));
}
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
loop {
let msg = pgb.read_message().await?;
let copy_data_bytes = match msg {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => return Ok(()),
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => return Ok(()), // client disconnected
};
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let response = match neon_fe_msg {
PagestreamFeMessage::NoOp => Ok(PagestreamBeMessage::NoOp),
x => Err(QueryError::Other(anyhow::anyhow!(
"this server only supports no-op: {x:?}"
))),
};
let response = response.unwrap_or_else(|e| {
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
}
}

View File

@@ -0,0 +1,130 @@
use std::env::args;
use clap::Parser;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(clap::Parser)]
struct Args {
#[clap(subcommand)]
mode: Mode,
}
#[derive(clap::Parser)]
enum Mode {
Client(Client),
Server(Server),
}
#[derive(clap::Parser)]
struct Client {
num_tasks: usize,
}
#[derive(clap::Parser)]
struct Server {}
#[tokio::main]
async fn main() {
let args: &'static _ = Box::leak(Box::new(Args::parse()));
match &args.mode {
Mode::Client(x) => client::client(x).await,
Mode::Server(x) => server::server(x).await,
}
}
mod client {
use std::sync::{atomic::{Ordering, AtomicU64}, Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::Client;
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
pub(crate) async fn client(args: &'static Client) {
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0} RPS/client: {:.2}",
completed_requests as f64 / elapsed.as_secs_f64(),
completed_requests as f64 / elapsed.as_secs_f64() / args.num_tasks as f64,
);
}
}
});
let mut tasks = Vec::new();
for _ in 0..args.num_tasks {
let stats = Arc::clone(&stats);
let t = tokio::spawn(client_task(args, stats));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
async fn client_task(args: &'static Client, stats: Arc<Stats>) -> anyhow::Result<()> {
let mut conn = tokio::net::TcpStream::connect("localhost:65000").await?;
conn.set_nodelay(true)?;
loop {
let mut buf = [0u8; 1];
conn.write_all(&buf).await?;
conn.read_exact(&mut buf).await?;
stats.inc();
}
}
}
mod server {
use anyhow::Context;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::Server;
pub(crate) async fn server(args: &'static Server) {
let listener = tokio::net::TcpListener::bind("localhost:65000").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
server_handle_connection(args, socket).await.unwrap();
});
}
}
async fn server_handle_connection(
args: &'static Server,
socket: tokio::net::TcpStream,
) -> anyhow::Result<()> {
socket
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
// let socket = tokio_io_timeout::TimeoutReader::new(socket);
tokio::pin!(socket);
loop {
let mut buf = [0u8; 4096];
socket.read_exact(&mut buf).await?;
socket.write_all(&buf).await?;
}
}
}

View File

@@ -345,7 +345,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
);
}
}

View File

@@ -747,6 +747,46 @@ async fn tenant_ignore_handler(
json_response(StatusCode::OK, ())
}
async fn tenant_duplicate_handler(
mut request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let src_tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let new_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[StorageTimeOperation::DuplicateTenant.into()])
.expect("bug")
.start_timer();
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
mgr::duplicate_tenant(
state.conf,
tenant_conf,
src_tenant_id,
new_tenant_id,
generation,
state.tenant_resources(),
&ctx,
&cancel,
)
.instrument(info_span!("tenant_duplicate", %src_tenant_id, tenant_id = %new_tenant_id))
.await?;
json_response(StatusCode::CREATED, TenantCreateResponse(new_tenant_id))
}
async fn tenant_list_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -1787,6 +1827,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/ignore", |r| {
api_handler(r, tenant_ignore_handler)
})
.post("/v1/tenant/:tenant_id/duplicate", |r| {
api_handler(r, tenant_duplicate_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_detail_handler)
})

View File

@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
#[strum(serialize = "create tenant")]
CreateTenant,
#[strum(serialize = "duplicate tenant")]
DuplicateTenant,
}
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
@@ -757,6 +760,7 @@ pub enum SmgrQueryType {
GetRelSize,
GetPageAtLsn,
GetDbSize,
NoOp,
}
#[derive(Debug)]

View File

@@ -491,6 +491,11 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::NoOp => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::NoOp);
let span = tracing::info_span!("no_op");
(Ok(PagestreamBeMessage::NoOp), span)
}
};
if let Err(e) = &response {

View File

@@ -1700,6 +1700,7 @@ const AUX_FILES_KEY: Key = Key {
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (
@@ -1715,7 +1716,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
})
}
fn is_rel_block_key(key: Key) -> bool {
/// See [[key_to_rel_block]].
pub fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}

View File

@@ -6,9 +6,11 @@ use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs;
use tokio::io::AsyncSeekExt;
use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
use anyhow::Context;
@@ -30,7 +32,11 @@ use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, LayerFileName};
use crate::tenant::{
create_tenant_files, remote_timeline_client, AttachedTenantConf, IndexPart, Tenant, TenantState,
};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -40,7 +46,7 @@ use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
use super::{SpawnMode, TenantSharedResources};
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
@@ -735,6 +741,171 @@ pub(crate) async fn create_tenant(
Ok(created_tenant)
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn duplicate_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
src_tenant_id: TenantId,
new_tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
ctx: &RequestContext,
cancel: &CancellationToken,
) -> Result<(), TenantMapInsertError> {
debug_assert_current_span_has_tenant_id();
// TODO: would be nice to use tenant_map_insert here, but, we're not ready to create a Tenant object yet
let tempdir = path_with_suffix_extension(
conf.tenants_path().join(&new_tenant_id.to_string()),
&format!("duplication.{TEMP_FILE_SUFFIX}"),
);
tokio::fs::remove_dir_all(&tempdir)
.await
.or_else(|e| match e.kind() {
std::io::ErrorKind::NotFound => Ok(()),
_ => Err(e),
})
.context("pre-run clean up tempdir")?;
tokio::fs::create_dir(&tempdir)
.await
.context("create tempdir")?;
// Copy the tenant's data in S3
let remote_storage = resources
.remote_storage
.as_ref()
.context("only works with remote storage")?;
let (remote_src_timelines, other_prefixes) = remote_timeline_client::list_remote_timelines(
remote_storage,
src_tenant_id,
cancel.clone(),
)
.await
.context("list src timelines")?;
if !other_prefixes.is_empty() {
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
"unimplemented: handling of other prefixes in src tenant: {:?}",
other_prefixes
)));
}
info!(?remote_src_timelines, "got src timelines");
for timeline_id in remote_src_timelines {
async {
let tempdir = tempdir.join(&timeline_id.to_string());
tokio::fs::create_dir(&tempdir)
.await
.context("create tempdir for timeline")?;
let remote_src_tl =
remote_timeline_client::remote_timeline_path(&src_tenant_id, &timeline_id);
let remote_dst_tl =
remote_timeline_client::remote_timeline_path(&new_tenant_id, &timeline_id);
let object_names = remote_storage
.list_prefixes(Some(&remote_src_tl))
.await
.context("list timeline remote prefix")?;
for name in object_names {
async {
let name = name.object_name().context(
"list_prefixes return values should always have object_name()=Some",
)?;
let remote_src_obj = remote_src_tl.join(name);
let remote_dst_obj = remote_dst_tl.join(name);
let tmp_obj_filepath = tempdir.join(name);
let mut tmp_obj_file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&tmp_obj_filepath)
.await
.context("create temp file")?;
let mut tmp_dl = remote_storage
.download(&remote_src_obj)
.await
.context("start download")?;
let tmp_obj_size =
tokio::io::copy(&mut tmp_dl.download_stream, &mut tmp_obj_file)
.await
.context("do the download")?;
if name == IndexPart::FILE_NAME {
// needs no patching
} else {
let name = LayerFileName::from_str(name).map_err(|e: String| {
anyhow::anyhow!("unknown key in timeline s3 prefix: {name:?}: {e}")
})?;
match name {
LayerFileName::Image(_) => {
ImageLayer::rewrite_tenant_timeline(
&tmp_obj_filepath,
new_tenant_id,
timeline_id, /* leave as is */
ctx,
)
.await
.context("rewrite tenant timeline")?;
}
LayerFileName::Delta(_) => {
DeltaLayer::rewrite_tenant_timeline(
&tmp_obj_filepath,
new_tenant_id,
timeline_id, /* leave as is */
ctx,
)
.await
.context("rewrite tenant timeline")?;
}
}
}
info!(?remote_dst_obj, "uploading");
tmp_obj_file
.seek(std::io::SeekFrom::Start(0))
.await
.context("seek tmp file to beginning for upload")?;
remote_storage
.upload(
tmp_obj_file,
tmp_obj_size as usize,
&remote_dst_obj,
tmp_dl.metadata,
)
.await
.context("upload modified")?;
tokio::fs::remove_file(tmp_obj_filepath)
.await
.context("remove temp file")?;
anyhow::Ok(())
}
.instrument(info_span!("copy object", object_name=?name))
.await
.context("copy object")?;
}
anyhow::Ok(())
}
.instrument(info_span!("copy_timeline", timeline_id=%timeline_id))
.await?;
}
tokio::fs::remove_dir_all(&tempdir)
.await
.context("post-run clean up tempdir")?;
attach_tenant(conf, new_tenant_id, generation, tenant_conf, resources, ctx).await
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
#[error(transparent)]

View File

@@ -609,6 +609,49 @@ impl Drop for DeltaLayerWriter {
}
}
impl DeltaLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -294,6 +294,49 @@ impl ImageLayer {
}
}
impl ImageLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -0,0 +1,43 @@
# Usage from top of repo:
# poetry run python3 test_runner/duplicate_tenant.py b97965931096047b2d54958756baee7b 10
from queue import Queue
import sys
import threading
import requests
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId
initial_tenant = sys.argv[1]
ncopies = int(sys.argv[2])
numthreads = int(sys.argv[3])
# class DuckTypedNeonEnv:
# pass
# cli = NeonCli(DuckTypedNeonEnv())
q = Queue()
for i in range(0, ncopies):
q.put(i)
for i in range(0, numthreads):
q.put(None)
def create():
while True:
if q.get() == None:
break
new_tenant = TenantId.generate()
res = requests.post(
f"http://localhost:9898/v1/tenant/{initial_tenant}/duplicate",
json={"new_tenant_id": str(new_tenant)},
)
res.raise_for_status()
for i in range(0, numthreads):
threading.Thread(target=create).start()

View File

@@ -219,6 +219,25 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_duplicate(
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
json={
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self,
tenant_id: TenantId,

View File

@@ -0,0 +1,54 @@
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId
from fixtures.log_helper import log
def test_tenant_duplicate(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
ep_main.safe_psql("CREATE TABLE foo (i int);")
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
new_tenant_id = TenantId.generate()
# timeline id remains unchanged with tenant_duplicate
# TODO: implement a remapping scheme so timeline ids remain globally unique
new_timeline_id = env.initial_timeline
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
ps_http = env.pageserver.http_client()
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
ps_http.tenant_delete(env.initial_tenant)
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
# start read-only replicate and validate
with env.endpoints.create_start(
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
) as ep_dup:
with ep_dup.connect() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM foo ORDER BY i;")
cur.fetchall() == [(1,), (2,), (3,)]
# ensure restarting PS works
env.pageserver.stop()
env.pageserver.start()

View File

@@ -74,6 +74,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
prev = Some(req);
}
PagestreamFeMessage::DbSize(_) => {}
PagestreamFeMessage::NoOp => {},
};
}