mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
7 Commits
release-pr
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb5b5cbdac | ||
|
|
55cdf6c7ff | ||
|
|
82d9c68667 | ||
|
|
bc91c40f56 | ||
|
|
c5f58ef3f7 | ||
|
|
bb8531d920 | ||
|
|
7553bbe3f5 |
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -2568,6 +2568,26 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.7.2"
|
||||
@@ -2676,9 +2696,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
version = "0.2.147"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -3281,6 +3301,7 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-tar",
|
||||
@@ -4524,6 +4545,12 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -5220,18 +5247,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -5357,22 +5384,39 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.28.1"
|
||||
version = "1.32.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.4.9",
|
||||
"socket2 0.5.3",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"io-uring 0.6.2",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-uring",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -5489,6 +5533,20 @@ dependencies = [
|
||||
"tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-uring"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
|
||||
dependencies = [
|
||||
"io-uring 0.5.13",
|
||||
"libc",
|
||||
"scoped-tls",
|
||||
"slab",
|
||||
"socket2 0.4.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.8"
|
||||
|
||||
@@ -112,7 +112,7 @@ impl RemotePath {
|
||||
self.0.file_name()
|
||||
}
|
||||
|
||||
pub fn join(&self, segment: &Utf8Path) -> Self {
|
||||
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
|
||||
@@ -82,6 +82,8 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
|
||||
#tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
208
pageserver/src/bin/getpage_bench.rs
Normal file
208
pageserver/src/bin/getpage_bench.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
use clap::Parser;
|
||||
use std::future::Future;
|
||||
use hyper::client::conn::Parts;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Body, Client, Uri};
|
||||
use pageserver::{repository, tenant};
|
||||
use rand::prelude::*;
|
||||
use std::env::args;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
struct Key(repository::Key);
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
repository::Key::from_hex(s).map(Key)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange {
|
||||
start: Key,
|
||||
end: Key,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end.0.to_i128() - self.start.0.to_i128()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
ps_endpoint: String,
|
||||
// tenant_id: String,
|
||||
// timeline_id: String,
|
||||
num_tasks: usize,
|
||||
num_requests: usize,
|
||||
tenants: Option<Vec<String>>,
|
||||
#[clap(long)]
|
||||
pick_n_tenants: Option<usize>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args: &'static Args = Box::leak(Box::new(Args::parse()));
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
let tenants = if let Some(tenants) = &args.tenants {
|
||||
tenants.clone()
|
||||
} else {
|
||||
// let tenant_id = "b97965931096047b2d54958756baee7b";
|
||||
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
|
||||
|
||||
let resp = client
|
||||
.get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
let mut out = Vec::new();
|
||||
for t in tenants.as_array().unwrap() {
|
||||
if let Some(limit) = args.pick_n_tenants {
|
||||
if out.len() >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
|
||||
}
|
||||
if let Some(limit) = args.pick_n_tenants {
|
||||
assert_eq!(out.len(), limit);
|
||||
}
|
||||
out
|
||||
};
|
||||
|
||||
let mut tenant_timelines = Vec::new();
|
||||
for tenant_id in tenants {
|
||||
let resp = client
|
||||
.get(
|
||||
Uri::try_from(&format!("{}/v1/tenant/{}/timeline", args.ps_endpoint, tenant_id))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
for t in timelines.as_array().unwrap() {
|
||||
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
|
||||
tenant_timelines.push((tenant_id.clone(), timeline_id));
|
||||
}
|
||||
}
|
||||
println!("tenant_timelines:\n{:?}", tenant_timelines);
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (tenant_id, timeline_id) in tenant_timelines {
|
||||
let t = tokio::spawn(timeline(
|
||||
args,
|
||||
client.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
));
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn timeline(
|
||||
args: &'static Args,
|
||||
client: Client<HttpConnector, Body>,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
) -> impl Future<Output = ()> {
|
||||
async move {
|
||||
let mut resp = client
|
||||
.get(
|
||||
Uri::try_from(&format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/keyspace",
|
||||
args.ps_endpoint, tenant_id, timeline_id
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
if !resp.status().is_success() {
|
||||
panic!("Failed to get keyspace: {resp:?}");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
|
||||
let lsn = Arc::new(keyspace["at_lsn"].as_str().unwrap().to_owned());
|
||||
|
||||
let ranges = keyspace["keys"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|r| {
|
||||
let r = r.as_array().unwrap();
|
||||
assert_eq!(r.len(), 2);
|
||||
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
|
||||
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
|
||||
KeyRange { start, end }
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// weighted ranges
|
||||
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
|
||||
|
||||
let ranges = Arc::new(ranges);
|
||||
let weights = Arc::new(weights);
|
||||
|
||||
let (tx, mut rx) = channel::<i32>(1000);
|
||||
let tx = Arc::new(AsyncMutex::new(tx));
|
||||
|
||||
let mut tasks = Vec::<JoinHandle<()>>::new();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
for i in 0..args.num_tasks {
|
||||
let ranges = ranges.clone();
|
||||
let weights = weights.clone();
|
||||
let lsn = lsn.clone();
|
||||
let client = client.clone();
|
||||
let tenant_id = tenant_id.clone();
|
||||
let timeline_id = timeline_id.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
for i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
|
||||
key
|
||||
};
|
||||
let url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/getpage?key={:036x}&lsn={}",
|
||||
args.ps_endpoint, tenant_id, timeline_id, key, lsn
|
||||
);
|
||||
let uri = url.parse::<Uri>().unwrap();
|
||||
let resp = client.get(uri).await.unwrap();
|
||||
}
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
println!(
|
||||
"RPS: {:.0}",
|
||||
(args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -681,6 +681,45 @@ async fn tenant_ignore_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_duplicate_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let src_tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let new_tenant_id = request_data.new_tenant_id;
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let _timer = STORAGE_TIME_GLOBAL
|
||||
.get_metric_with_label_values(&[StorageTimeOperation::DuplicateTenant.into()])
|
||||
.expect("bug")
|
||||
.start_timer();
|
||||
|
||||
let tenant_conf =
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let generation = get_request_generation(state, request_data.generation)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
mgr::duplicate_tenant(
|
||||
state.conf,
|
||||
tenant_conf,
|
||||
src_tenant_id,
|
||||
new_tenant_id,
|
||||
generation,
|
||||
state.tenant_resources(),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_duplicate", %src_tenant_id, tenant_id = %new_tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::CREATED, TenantCreateResponse(new_tenant_id))
|
||||
}
|
||||
|
||||
async fn tenant_list_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1728,6 +1767,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/ignore", |r| {
|
||||
api_handler(r, tenant_ignore_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/duplicate", |r| {
|
||||
api_handler(r, tenant_duplicate_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
api_handler(r, timeline_detail_handler)
|
||||
})
|
||||
|
||||
@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
|
||||
|
||||
#[strum(serialize = "create tenant")]
|
||||
CreateTenant,
|
||||
|
||||
#[strum(serialize = "duplicate tenant")]
|
||||
DuplicateTenant,
|
||||
}
|
||||
|
||||
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ, PageWriteGuard};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
@@ -169,7 +169,7 @@ impl FileBlockReader {
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
async fn fill_buffer(&self, buf: PageWriteGuard<'static>, blkno: u32) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
@@ -198,7 +198,7 @@ impl FileBlockReader {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,11 +44,11 @@ impl EphemeralFile {
|
||||
"ephemeral-{filename_disambiguator}"
|
||||
)));
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
)
|
||||
.await?;
|
||||
let file = {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true).write(true).create(true);
|
||||
VirtualFile::open_with_options_async(&filename, options).await?
|
||||
};
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
@@ -89,10 +89,9 @@ impl EphemeralFile {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
let write_guard = self
|
||||
.file
|
||||
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
|
||||
@@ -4,8 +4,10 @@
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -26,9 +28,11 @@ use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
|
||||
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, LayerFileName};
|
||||
use crate::tenant::{
|
||||
create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
|
||||
TenantState,
|
||||
create_tenant_files, remote_timeline_client, AttachMarkerMode, AttachedTenantConf,
|
||||
CreateTenantFilesMode, IndexPart, Tenant, TenantState,
|
||||
};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
@@ -695,6 +699,159 @@ pub(crate) async fn create_tenant(
|
||||
}).await
|
||||
}
|
||||
|
||||
pub(crate) async fn duplicate_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
src_tenant_id: TenantId,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
resources: TenantSharedResources,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
// TODO: would be nice to use tenant_map_insert here, but, we're not ready to create a Tenant object yet
|
||||
let tempdir = path_with_suffix_extension(
|
||||
conf.tenants_path().join(&new_tenant_id.to_string()),
|
||||
&format!("duplication.{TEMP_FILE_SUFFIX}"),
|
||||
);
|
||||
tokio::fs::remove_dir_all(&tempdir)
|
||||
.await
|
||||
.or_else(|e| match e.kind() {
|
||||
std::io::ErrorKind::NotFound => Ok(()),
|
||||
_ => Err(e),
|
||||
})
|
||||
.context("pre-run clean up tempdir")?;
|
||||
|
||||
tokio::fs::create_dir(&tempdir)
|
||||
.await
|
||||
.context("create tempdir")?;
|
||||
|
||||
// Copy the tenant's data in S3
|
||||
let remote_storage = resources
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.context("only works with remote storage")?;
|
||||
|
||||
let remote_src_timelines =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, src_tenant_id)
|
||||
.await
|
||||
.context("list src timelines")?;
|
||||
|
||||
info!(?remote_src_timelines, "got src timelines");
|
||||
|
||||
for timeline_id in remote_src_timelines {
|
||||
async {
|
||||
let tempdir = tempdir.join(&timeline_id.to_string());
|
||||
|
||||
tokio::fs::create_dir(&tempdir)
|
||||
.await
|
||||
.context("create tempdir for timeline")?;
|
||||
|
||||
let remote_src_tl =
|
||||
remote_timeline_client::remote_timeline_path(&src_tenant_id, &timeline_id);
|
||||
let remote_dst_tl =
|
||||
remote_timeline_client::remote_timeline_path(&new_tenant_id, &timeline_id);
|
||||
|
||||
let object_names = remote_storage
|
||||
.list_prefixes(Some(&remote_src_tl))
|
||||
.await
|
||||
.context("list timeline remote prefix")?;
|
||||
|
||||
for name in object_names {
|
||||
async {
|
||||
let name = name.object_name().context(
|
||||
"list_prefixes return values should always have object_name()=Some",
|
||||
)?;
|
||||
let remote_src_obj = remote_src_tl.join(name);
|
||||
let remote_dst_obj = remote_dst_tl.join(name);
|
||||
|
||||
let tmp_obj_filepath = tempdir.join(name);
|
||||
let mut tmp_obj_file = tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&tmp_obj_filepath)
|
||||
.await
|
||||
.context("create temp file")?;
|
||||
let mut tmp_dl = remote_storage
|
||||
.download(&remote_src_obj)
|
||||
.await
|
||||
.context("start download")?;
|
||||
let tmp_obj_size =
|
||||
tokio::io::copy(&mut tmp_dl.download_stream, &mut tmp_obj_file)
|
||||
.await
|
||||
.context("do the download")?;
|
||||
|
||||
if name == IndexPart::FILE_NAME {
|
||||
// needs no patching
|
||||
} else {
|
||||
let name = LayerFileName::from_str(name).map_err(|e: String| {
|
||||
anyhow::anyhow!("unknown key in timeline s3 prefix: {name:?}: {e}")
|
||||
})?;
|
||||
match name {
|
||||
LayerFileName::Image(_) => {
|
||||
ImageLayer::rewrite_tenant_timeline(
|
||||
&tmp_obj_filepath,
|
||||
new_tenant_id,
|
||||
timeline_id, /* leave as is */
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("rewrite tenant timeline")?;
|
||||
}
|
||||
LayerFileName::Delta(_) => {
|
||||
DeltaLayer::rewrite_tenant_timeline(
|
||||
&tmp_obj_filepath,
|
||||
new_tenant_id,
|
||||
timeline_id, /* leave as is */
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("rewrite tenant timeline")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(?remote_dst_obj, "uploading");
|
||||
|
||||
tmp_obj_file
|
||||
.seek(std::io::SeekFrom::Start(0))
|
||||
.await
|
||||
.context("seek tmp file to beginning for upload")?;
|
||||
remote_storage
|
||||
.upload(
|
||||
tmp_obj_file,
|
||||
tmp_obj_size as usize,
|
||||
&remote_dst_obj,
|
||||
tmp_dl.metadata,
|
||||
)
|
||||
.await
|
||||
.context("upload modified")?;
|
||||
|
||||
tokio::fs::remove_file(tmp_obj_filepath)
|
||||
.await
|
||||
.context("remove temp file")?;
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("copy object", object_name=?name))
|
||||
.await
|
||||
.context("copy object")?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("copy_timeline", timeline_id=%timeline_id))
|
||||
.await?;
|
||||
}
|
||||
|
||||
tokio::fs::remove_dir_all(&tempdir)
|
||||
.await
|
||||
.context("post-run clean up tempdir")?;
|
||||
|
||||
attach_tenant(conf, new_tenant_id, generation, tenant_conf, resources, ctx).await
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -844,6 +844,49 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
/// Assume the file at `path` is corrupt if this function returns with an error.
|
||||
pub(crate) async fn rewrite_tenant_timeline(
|
||||
path: &Utf8Path,
|
||||
new_tenant: TenantId,
|
||||
new_timeline: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true)
|
||||
.write(true);
|
||||
let file = VirtualFile::open_with_options_async(path, options)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != DELTA_FILE_MAGIC {
|
||||
bail!("File '{}' is not a delta layer", path);
|
||||
}
|
||||
let new_summary = Summary {
|
||||
tenant_id: new_tenant,
|
||||
timeline_id: new_timeline,
|
||||
..actual_summary
|
||||
};
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf)?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
anyhow::bail!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
);
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -436,6 +436,48 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
/// Assume the file at `path` is corrupt if this function returns with an error.
|
||||
pub(crate) async fn rewrite_tenant_timeline(
|
||||
path: &Utf8Path,
|
||||
new_tenant: TenantId,
|
||||
new_timeline: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true).write(true);
|
||||
let file = VirtualFile::open_with_options_async(path, options)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != IMAGE_FILE_MAGIC {
|
||||
bail!("File '{}' is not a delta layer", path);
|
||||
}
|
||||
let new_summary = Summary {
|
||||
tenant_id: new_tenant,
|
||||
timeline_id: new_timeline,
|
||||
..actual_summary
|
||||
};
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf)?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
anyhow::bail!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
);
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
@@ -558,11 +600,11 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
info!("new image layer {path}");
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
&path,
|
||||
std::fs::OpenOptions::new().write(true).create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let mut file = {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.write(true).create_new(true);
|
||||
VirtualFile::open_with_options_async(&path, options).await?
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
|
||||
@@ -11,14 +11,17 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -52,7 +55,7 @@ pub struct VirtualFile {
|
||||
/// opened, in the VirtualFile::create() function, and strip the flag before
|
||||
/// storing it here.
|
||||
pub path: Utf8PathBuf,
|
||||
open_options: OpenOptions,
|
||||
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
|
||||
|
||||
// These are strings becase we only use them for metrics, and those expect strings.
|
||||
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
|
||||
@@ -110,7 +113,7 @@ impl OpenFiles {
|
||||
///
|
||||
/// On return, we hold a lock on the slot, and its 'tag' has been updated
|
||||
/// recently_used has been set. It's all ready for reuse.
|
||||
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
//
|
||||
// Run the clock algorithm to find a slot to replace.
|
||||
//
|
||||
@@ -142,7 +145,7 @@ impl OpenFiles {
|
||||
}
|
||||
retries += 1;
|
||||
} else {
|
||||
slot_guard = slot.inner.write().unwrap();
|
||||
slot_guard = slot.inner.write().await;
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
@@ -153,7 +156,7 @@ impl OpenFiles {
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::CloseByReplace)
|
||||
@@ -208,20 +211,43 @@ impl CrashsafeOverwriteError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Observe duration for the given storage I/O operation
|
||||
///
|
||||
/// Unlike `observe_closure_duration`, this supports async,
|
||||
/// where "support" means that we measure wall clock time.
|
||||
macro_rules! observe_duration {
|
||||
($op:expr, $($body:tt)*) => {{
|
||||
let instant = Instant::now();
|
||||
let result = $($body)*;
|
||||
let elapsed = instant.elapsed().as_secs_f64();
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get($op)
|
||||
.observe(elapsed);
|
||||
result
|
||||
}}
|
||||
}
|
||||
|
||||
macro_rules! with_file {
|
||||
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
|
||||
Self::open_with_options(path, OpenOptions::new().read(true)).await
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true);
|
||||
Self::open_with_options_async(path, options).await
|
||||
}
|
||||
|
||||
/// Create a new file for writing. If the file exists, it will be truncated.
|
||||
/// Like File::create.
|
||||
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
|
||||
Self::open_with_options(
|
||||
path,
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
)
|
||||
.await
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.write(true).create(true).truncate(true);
|
||||
Self::open_with_options_async(path, options).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -229,6 +255,7 @@ impl VirtualFile {
|
||||
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
|
||||
/// they will be applied also when the file is subsequently re-opened, not only
|
||||
/// on the first time. Make sure that's sane!
|
||||
#[cfg(test)]
|
||||
pub async fn open_with_options(
|
||||
path: &Utf8Path,
|
||||
open_options: &OpenOptions,
|
||||
@@ -244,11 +271,9 @@ impl VirtualFile {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
|
||||
let file = STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Open)
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -293,16 +318,17 @@ impl VirtualFile {
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
|
||||
}
|
||||
let mut file = Self::open_with_options(
|
||||
tmp_path,
|
||||
OpenOptions::new()
|
||||
let mut file = {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true),
|
||||
)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
|
||||
.create_new(true);
|
||||
Self::open_with_options_async(tmp_path, options)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::CreateTempfile)?
|
||||
};
|
||||
file.write_all(content)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::WriteContents)?;
|
||||
@@ -318,10 +344,13 @@ impl VirtualFile {
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
// VirtualFile., and it eventually does a blocking write lock instead of
|
||||
// try_lock.
|
||||
let final_parent_dirfd =
|
||||
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
|
||||
let final_parent_dirfd = {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true);
|
||||
Self::open_with_options_async(final_path_parent, options)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
|
||||
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?
|
||||
};
|
||||
final_parent_dirfd
|
||||
.sync_all()
|
||||
.await
|
||||
@@ -329,24 +358,86 @@ impl VirtualFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
///
|
||||
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
|
||||
/// they will be applied also when the file is subsequently re-opened, not only
|
||||
/// on the first time. Make sure that's sane!
|
||||
pub async fn open_with_options_async(
|
||||
path: &Utf8Path,
|
||||
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
let path_str = path.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let tenant_id;
|
||||
let timeline_id;
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
|
||||
tenant_id = parts[parts.len() - 4].to_string();
|
||||
timeline_id = parts[parts.len() - 2].to_string();
|
||||
} else {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
|
||||
let file = {
|
||||
let start = std::time::Instant::now();
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let file: OwnedFd = system
|
||||
.open(path, &open_options)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})?;
|
||||
let file = File::from(file);
|
||||
file
|
||||
};
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let mut reopen_options = open_options;
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
open_options: reopen_options,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
|
||||
.await?
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
|
||||
.await?
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
}
|
||||
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
/// opens it and evicts some other File if necessary. The passed parameter is
|
||||
/// assumed to be a function available for the physical `File`.
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'static>, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -356,27 +447,23 @@ impl VirtualFile {
|
||||
// We only need to hold the handle lock while we read the current handle. If
|
||||
// another thread closes the file and recycles the slot for a different file,
|
||||
// we will notice that the handle we read is no longer valid and retry.
|
||||
let mut handle = *self.handle.read().unwrap();
|
||||
let mut handle = *self.handle.read().await;
|
||||
loop {
|
||||
// Check if the slot contains our File
|
||||
{
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME_METRIC
|
||||
.get(op)
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
let slot_guard = slot.inner.read().await;
|
||||
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(FileGuard { slot_guard });
|
||||
}
|
||||
}
|
||||
|
||||
// The slot didn't contain our File. We will have to open it ourselves,
|
||||
// but before that, grab a write lock on handle in the VirtualFile, so
|
||||
// that no other thread will try to concurrently open the same file.
|
||||
let handle_guard = self.handle.write().unwrap();
|
||||
let handle_guard = self.handle.write().await;
|
||||
|
||||
// If another thread changed the handle while we were not holding the lock,
|
||||
// then the handle might now be valid again. Loop back to retry.
|
||||
@@ -390,17 +477,24 @@ impl VirtualFile {
|
||||
|
||||
// We need to open the file ourselves. The handle in the VirtualFile is
|
||||
// now locked in write-mode. Find a free slot to put it in.
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Open)
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME_METRIC
|
||||
.get(op)
|
||||
.observe_closure_duration(|| func(&file));
|
||||
let file = {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let file: OwnedFd =
|
||||
system
|
||||
.open(&self.path, &self.open_options)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})?;
|
||||
let file = File::from(file);
|
||||
file
|
||||
};
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -408,7 +502,9 @@ impl VirtualFile {
|
||||
|
||||
*handle_guard = handle;
|
||||
|
||||
Ok(result)
|
||||
return Ok(FileGuard {
|
||||
slot_guard: slot_guard.downgrade(),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
@@ -423,11 +519,9 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = self
|
||||
.with_file(StorageIoOperation::Seek, |mut file| {
|
||||
file.seek(SeekFrom::End(offset))
|
||||
})
|
||||
.await??
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -447,24 +541,59 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
pub async fn read_exact_at(
|
||||
&self,
|
||||
mut write_guard: PageWriteGuard<'static>,
|
||||
mut offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let file_guard: FileGuard<'static> = self.lock_file().await?;
|
||||
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
struct PageWriteGuardBuf {
|
||||
buf: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
let buf = PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
|
||||
let guard = scopeguard::guard(file_guard, |_| {
|
||||
panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)")
|
||||
});
|
||||
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
|
||||
let _ = OwnedFd::into_raw_fd(owned_fd);
|
||||
let _ = scopeguard::ScopeGuard::into_inner(guard);
|
||||
let PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to,
|
||||
} = buf;
|
||||
if let Ok(num_read) = res {
|
||||
assert!(init_up_to == num_read); // TODO need to deal with short reads here
|
||||
}
|
||||
res.map(|_| write_guard)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -514,22 +643,10 @@ impl VirtualFile {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self
|
||||
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
|
||||
.await?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self
|
||||
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
|
||||
.await?;
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
|
||||
@@ -539,6 +656,18 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -571,20 +700,39 @@ impl VirtualFile {
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
let handle = self.handle.get_mut();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
// unrelated I/O.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
|
||||
if slot_guard.tag == tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also the `CloseByReplace` operation for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
}
|
||||
}
|
||||
|
||||
// We don't have async drop so we cannot directly await the lock here.
|
||||
// Instead, first do a best-effort attempt at closing the underlying
|
||||
// file descriptor by using `try_write`, and if that fails, spawn
|
||||
// a tokio task to do it asynchronously: we just want it to be
|
||||
// cleaned up eventually.
|
||||
// Most of the time, the `try_lock` should succeed though,
|
||||
// as we have `&mut self` access. In other words, if the slot
|
||||
// is still occupied by our file, there should be no access from
|
||||
// other I/O operations; the only other possible place to lock
|
||||
// the slot is the lock algorithm looking for free slots.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
if let Ok(slot_guard) = slot.inner.try_write() {
|
||||
clean_slot(slot, slot_guard, handle.tag);
|
||||
} else {
|
||||
let tag = handle.tag;
|
||||
tokio::spawn(async move {
|
||||
let slot_guard = slot.inner.write().await;
|
||||
clean_slot(slot, slot_guard, tag);
|
||||
});
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
43
test_runner/duplicate_tenant.py
Normal file
43
test_runner/duplicate_tenant.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 test_runner/duplicate_tenant.py b97965931096047b2d54958756baee7b 10
|
||||
from queue import Queue
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import requests
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
|
||||
initial_tenant = sys.argv[1]
|
||||
ncopies = int(sys.argv[2])
|
||||
numthreads = int(sys.argv[3])
|
||||
|
||||
|
||||
# class DuckTypedNeonEnv:
|
||||
# pass
|
||||
|
||||
|
||||
# cli = NeonCli(DuckTypedNeonEnv())
|
||||
|
||||
q = Queue()
|
||||
for i in range(0, ncopies):
|
||||
q.put(i)
|
||||
|
||||
for i in range(0, numthreads):
|
||||
q.put(None)
|
||||
|
||||
|
||||
def create():
|
||||
while True:
|
||||
if q.get() == None:
|
||||
break
|
||||
new_tenant = TenantId.generate()
|
||||
res = requests.post(
|
||||
f"http://localhost:9898/v1/tenant/{initial_tenant}/duplicate",
|
||||
json={"new_tenant_id": str(new_tenant)},
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
|
||||
for i in range(0, numthreads):
|
||||
threading.Thread(target=create).start()
|
||||
@@ -215,6 +215,25 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(new_tenant_id, str)
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_duplicate(
|
||||
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
|
||||
json={
|
||||
"new_tenant_id": str(new_tenant_id),
|
||||
**(conf or {}),
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
if res.status_code == 409:
|
||||
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
|
||||
new_tenant_id = res.json()
|
||||
assert isinstance(new_tenant_id, str)
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_attach(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
54
test_runner/regress/test_tenant_duplicate.py
Normal file
54
test_runner/regress/test_tenant_duplicate.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import time
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
def test_tenant_duplicate(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
|
||||
ep_main.safe_psql("CREATE TABLE foo (i int);")
|
||||
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
|
||||
last_flush_lsn = last_flush_lsn_upload(
|
||||
env, ep_main, env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
new_tenant_id = TenantId.generate()
|
||||
# timeline id remains unchanged with tenant_duplicate
|
||||
# TODO: implement a remapping scheme so timeline ids remain globally unique
|
||||
new_timeline_id = env.initial_timeline
|
||||
|
||||
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
|
||||
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
|
||||
|
||||
# start read-only replicate and validate
|
||||
with env.endpoints.create_start(
|
||||
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
|
||||
) as ep_dup:
|
||||
with ep_dup.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * FROM foo ORDER BY i;")
|
||||
cur.fetchall() == [(1,), (2,), (3,)]
|
||||
|
||||
# ensure restarting PS works
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
Reference in New Issue
Block a user