mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
23 Commits
arpad/less
...
jcsp/split
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed2be78861 | ||
|
|
577982b778 | ||
|
|
574645412b | ||
|
|
11945e64ec | ||
|
|
cddafc79e1 | ||
|
|
af7cca4949 | ||
|
|
89cae64e38 | ||
|
|
1f417af9fd | ||
|
|
1684bbf162 | ||
|
|
90cadfa986 | ||
|
|
2226acef7c | ||
|
|
24ce878039 | ||
|
|
84914434e3 | ||
|
|
b655c7030f | ||
|
|
3695a1efa1 | ||
|
|
75b4440d07 | ||
|
|
ee3437cbd8 | ||
|
|
dbe0aa653a | ||
|
|
39427925c2 | ||
|
|
af43f78561 | ||
|
|
ed57772793 | ||
|
|
f1de18f1c9 | ||
|
|
dbb0c967d5 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -478,6 +478,7 @@ jobs:
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
PAGESERVER_VALIDATE_VEC_GET: true
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
# Ref https://github.com/neondatabase/neon/issues/4540
|
||||
@@ -557,6 +558,9 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
PAGESERVER_VALIDATE_VEC_GET: false
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
66
Cargo.lock
generated
66
Cargo.lock
generated
@@ -722,9 +722,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.18.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6218987c374650fdad0b476bfc675729762c28dfb35f58608a38a2b1ea337dd"
|
||||
checksum = "70fd680c0d0424a518229b1150922f92653ba2ac933aa000abc8bf1ca08105f7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.1",
|
||||
@@ -752,9 +752,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "azure_identity"
|
||||
version = "0.18.1"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e1eacc4f7fb2a73d57c39139d0fc3aed78435606055779ddaef4b43cdf919a8"
|
||||
checksum = "a6d2060f5b2e1c664026ca4edd561306c473be887c1f7a81f10bf06f9b71c63f"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
@@ -772,9 +772,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "azure_storage"
|
||||
version = "0.18.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ade8f2653e408de88b9eafec9f48c3c26b94026375e88adbd34523a7dd9795a1"
|
||||
checksum = "15d3da73bfa09350e1bd6ae2a260806fcf90048c7e78cd2d8f88be60b19a7266"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"async-lock",
|
||||
@@ -791,9 +791,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "azure_storage_blobs"
|
||||
version = "0.18.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "025701c7cc5b523100f0f3b2b01723564ec5a86c03236521c06826337047e872"
|
||||
checksum = "149c21834a4105d761e3dd33d91c2a3064acc05a3c978848ea8089102ae45c94"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_core",
|
||||
@@ -812,9 +812,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "azure_svc_blobstorage"
|
||||
version = "0.18.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76051e5bb67cea1055abe5e530a0878feac7e0ab4cbbcb4a6adc953a58993389"
|
||||
checksum = "88c888b7bf522d5405218b8613bf0fae7ddaae6ef3bf4ad42ae005993c96ab8b"
|
||||
dependencies = [
|
||||
"azure_core",
|
||||
"bytes",
|
||||
@@ -2763,9 +2763,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.63"
|
||||
version = "0.3.69"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790"
|
||||
checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d"
|
||||
dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
@@ -5085,6 +5085,7 @@ dependencies = [
|
||||
"aws-smithy-async",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
@@ -6412,11 +6413,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.37"
|
||||
version = "0.1.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
|
||||
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
@@ -6436,9 +6436,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.24"
|
||||
version = "0.1.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
|
||||
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -6447,9 +6447,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.31"
|
||||
version = "0.1.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
|
||||
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
@@ -6904,9 +6904,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen"
|
||||
version = "0.2.86"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73"
|
||||
checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"wasm-bindgen-macro",
|
||||
@@ -6914,9 +6914,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-backend"
|
||||
version = "0.2.86"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb"
|
||||
checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da"
|
||||
dependencies = [
|
||||
"bumpalo",
|
||||
"log",
|
||||
@@ -6929,9 +6929,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-futures"
|
||||
version = "0.4.36"
|
||||
version = "0.4.42"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e"
|
||||
checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
@@ -6941,9 +6941,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro"
|
||||
version = "0.2.86"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258"
|
||||
checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"wasm-bindgen-macro-support",
|
||||
@@ -6951,9 +6951,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro-support"
|
||||
version = "0.2.86"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
|
||||
checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -6964,9 +6964,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-shared"
|
||||
version = "0.2.86"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93"
|
||||
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
|
||||
|
||||
[[package]]
|
||||
name = "wasm-streams"
|
||||
@@ -6998,9 +6998,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "web-sys"
|
||||
version = "0.3.63"
|
||||
version = "0.3.69"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2"
|
||||
checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
|
||||
@@ -45,10 +45,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
azure_core = "0.18"
|
||||
azure_identity = "0.18"
|
||||
azure_storage = "0.18"
|
||||
azure_storage_blobs = "0.18"
|
||||
azure_core = "0.19"
|
||||
azure_identity = "0.19"
|
||||
azure_storage = "0.19"
|
||||
azure_storage_blobs = "0.19"
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
|
||||
@@ -417,6 +417,54 @@ async fn handle_tenant(
|
||||
println!("{} {:?}", t.id, t.state);
|
||||
}
|
||||
}
|
||||
Some(("import", import_match)) => {
|
||||
let tenant_id = parse_tenant_id(import_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_response = storage_controller.tenant_import(tenant_id).await?;
|
||||
|
||||
let shard_zero = create_response
|
||||
.shards
|
||||
.first()
|
||||
.expect("Import response omitted shards");
|
||||
|
||||
let attached_pageserver_id = shard_zero.node_id;
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
|
||||
|
||||
println!(
|
||||
"Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
|
||||
);
|
||||
|
||||
let timelines = pageserver
|
||||
.http_client
|
||||
.list_timelines(shard_zero.shard_id)
|
||||
.await?;
|
||||
|
||||
// Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
|
||||
let main_timeline = timelines
|
||||
.iter()
|
||||
.find(|t| t.ancestor_timeline_id.is_none())
|
||||
.expect("No timelines found")
|
||||
.timeline_id;
|
||||
|
||||
let mut branch_i = 0;
|
||||
for timeline in timelines.iter() {
|
||||
let branch_name = if timeline.timeline_id == main_timeline {
|
||||
"main".to_string()
|
||||
} else {
|
||||
branch_i += 1;
|
||||
format!("branch_{branch_i}")
|
||||
};
|
||||
|
||||
println!(
|
||||
"Importing timeline {tenant_id}/{} as branch {branch_name}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
|
||||
env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
|
||||
}
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.get_many::<String>("config")
|
||||
@@ -1480,6 +1528,8 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("config")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
|
||||
.subcommand(Command::new("import").arg(tenant_id_arg.clone().required(true))
|
||||
.about("Import a tenant that is present in remote storage, and create branches for its timelines"))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pageserver")
|
||||
|
||||
@@ -130,6 +130,7 @@ pub struct PageServerConf {
|
||||
pub(crate) virtual_file_io_engine: Option<String>,
|
||||
pub(crate) get_vectored_impl: Option<String>,
|
||||
pub(crate) get_impl: Option<String>,
|
||||
pub(crate) validate_vectored_get: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
@@ -143,6 +144,7 @@ impl Default for PageServerConf {
|
||||
virtual_file_io_engine: None,
|
||||
get_vectored_impl: None,
|
||||
get_impl: None,
|
||||
validate_vectored_get: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,6 +93,7 @@ impl PageServerNode {
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
validate_vectored_get,
|
||||
} = &self.conf;
|
||||
|
||||
let id = format!("id={}", id);
|
||||
@@ -117,6 +118,11 @@ impl PageServerNode {
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let validate_vectored_get = if let Some(validate_vectored_get) = validate_vectored_get {
|
||||
format!("validate_vectored_get={validate_vectored_get}")
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
|
||||
|
||||
@@ -131,6 +137,7 @@ impl PageServerNode {
|
||||
virtual_file_io_engine,
|
||||
get_vectored_impl,
|
||||
get_impl,
|
||||
validate_vectored_get,
|
||||
];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.control_plane_api {
|
||||
@@ -441,6 +448,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
switch_to_aux_file_v2: settings
|
||||
.remove("switch_to_aux_file_v2")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_to_aux_file_v2' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -559,6 +571,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
switch_to_aux_file_v2: settings
|
||||
.remove("switch_to_aux_file_v2")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_to_aux_file_v2' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -472,6 +472,16 @@ impl StorageController {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
|
||||
self.dispatch::<(), TenantCreateResponse>(
|
||||
Method::POST,
|
||||
format!("debug/v1/tenant/{tenant_id}/import"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
|
||||
self.dispatch::<(), _>(
|
||||
|
||||
@@ -4,7 +4,6 @@ use bytes::BufMut;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::RangeInclusive;
|
||||
use std::{fmt, ops::Range};
|
||||
|
||||
use crate::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
@@ -30,24 +29,25 @@ pub const KEY_SIZE: usize = 18;
|
||||
/// See [`Key::to_i128`] for more information on the encoding.
|
||||
pub const METADATA_KEY_SIZE: usize = 16;
|
||||
|
||||
/// The key prefix start range for the metadata keys. All keys with the first byte >= 0x80 is a metadata key.
|
||||
pub const METADATA_KEY_BEGIN_PREFIX: u8 = 0x80;
|
||||
/// The key prefix start range for the metadata keys. All keys with the first byte >= 0x40 is a metadata key.
|
||||
pub const METADATA_KEY_BEGIN_PREFIX: u8 = 0x60;
|
||||
pub const METADATA_KEY_END_PREFIX: u8 = 0x7F;
|
||||
|
||||
/// The (reserved) key prefix of relation sizes.
|
||||
pub const RELATION_SIZE_PREFIX: u8 = 0x81;
|
||||
pub const RELATION_SIZE_PREFIX: u8 = 0x61;
|
||||
|
||||
/// The key prefix of AUX file keys.
|
||||
pub const AUX_KEY_PREFIX: u8 = 0x82;
|
||||
pub const AUX_KEY_PREFIX: u8 = 0x62;
|
||||
|
||||
/// Check if the key falls in the range of metadata keys.
|
||||
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
|
||||
key[0] >= METADATA_KEY_BEGIN_PREFIX
|
||||
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
|
||||
}
|
||||
|
||||
impl Key {
|
||||
/// Check if the key falls in the range of metadata keys.
|
||||
pub const fn is_metadata_key(&self) -> bool {
|
||||
self.field1 >= METADATA_KEY_BEGIN_PREFIX
|
||||
self.field1 >= METADATA_KEY_BEGIN_PREFIX && self.field1 < METADATA_KEY_END_PREFIX
|
||||
}
|
||||
|
||||
/// Encode a metadata key to a storage key.
|
||||
@@ -80,7 +80,7 @@ impl Key {
|
||||
}
|
||||
|
||||
/// Get the range of metadata keys.
|
||||
pub fn metadata_key_range() -> RangeInclusive<Self> {
|
||||
pub fn metadata_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: METADATA_KEY_BEGIN_PREFIX,
|
||||
field2: 0,
|
||||
@@ -88,13 +88,32 @@ impl Key {
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..=Key {
|
||||
field1: u8::MAX,
|
||||
field2: u16::MAX as u32,
|
||||
field3: u32::MAX,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
}..Key {
|
||||
field1: METADATA_KEY_END_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the range of aux keys.
|
||||
pub fn metadata_aux_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: AUX_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: AUX_KEY_PREFIX + 1,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +122,7 @@ impl Key {
|
||||
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
|
||||
pub fn to_i128(&self) -> i128 {
|
||||
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
|
||||
(((self.field1 & 0xf) as i128) << 120)
|
||||
(((self.field1 & 0x7F) as i128) << 120)
|
||||
| (((self.field2 & 0xFFFF) as i128) << 104)
|
||||
| ((self.field3 as i128) << 72)
|
||||
| ((self.field4 as i128) << 40)
|
||||
@@ -113,7 +132,7 @@ impl Key {
|
||||
|
||||
pub const fn from_i128(x: i128) -> Self {
|
||||
Key {
|
||||
field1: ((x >> 120) & 0xf) as u8,
|
||||
field1: ((x >> 120) & 0x7F) as u8,
|
||||
field2: ((x >> 104) & 0xFFFF) as u32,
|
||||
field3: (x >> 72) as u32,
|
||||
field4: (x >> 40) as u32,
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::key::Key;
|
||||
use crate::{
|
||||
key::Key,
|
||||
shard::{ShardCount, ShardIdentity},
|
||||
};
|
||||
use itertools::Itertools;
|
||||
|
||||
///
|
||||
@@ -14,44 +17,275 @@ pub struct KeySpace {
|
||||
pub ranges: Vec<Range<Key>>,
|
||||
}
|
||||
|
||||
impl KeySpace {
|
||||
/// Represents a contiguous half-open range of the keyspace, masked according to a particular
|
||||
/// ShardNumber's stripes: within this range of keys, only some "belong" to the current
|
||||
/// shard.
|
||||
///
|
||||
/// When we iterate over keys within this object, we will skip any keys that don't belong
|
||||
/// to this shard.
|
||||
///
|
||||
/// The start + end keys may not belong to the shard: these specify where layer files should
|
||||
/// start + end, but we will never actually read/write those keys.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ShardedRange<'a> {
|
||||
pub shard_identity: &'a ShardIdentity,
|
||||
pub range: Range<Key>,
|
||||
}
|
||||
|
||||
// Calculate the size of a range within the blocks of the same relation, or spanning only the
|
||||
// top page in the previous relation's space.
|
||||
fn contiguous_range_len(range: &Range<Key>) -> u32 {
|
||||
debug_assert!(is_contiguous_range(range));
|
||||
if range.start.field6 == 0xffffffff {
|
||||
range.end.field6 + 1
|
||||
} else {
|
||||
range.end.field6 - range.start.field6
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if this key range includes only keys in the same relation's data blocks, or
|
||||
/// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
|
||||
///
|
||||
/// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
|
||||
/// be on our shard. Later in ShardedRange we do the extra work to figure out how much
|
||||
/// of a given contiguous range is present on one shard.
|
||||
///
|
||||
/// This matters, because:
|
||||
/// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
|
||||
/// - Within such ranges, we may calculate distances using simple subtraction of field6.
|
||||
fn is_contiguous_range(range: &Range<Key>) -> bool {
|
||||
range.start.field1 == range.end.field1
|
||||
&& range.start.field2 == range.end.field2
|
||||
&& range.start.field3 == range.end.field3
|
||||
&& range.start.field4 == range.end.field4
|
||||
&& (range.start.field5 == range.end.field5
|
||||
|| (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
|
||||
}
|
||||
|
||||
impl<'a> ShardedRange<'a> {
|
||||
pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
|
||||
Self {
|
||||
shard_identity,
|
||||
range,
|
||||
}
|
||||
}
|
||||
|
||||
/// Break up this range into chunks, each of which has at least one local key in it if the
|
||||
/// total range has at least one local key.
|
||||
pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
|
||||
// Optimization for single-key case (e.g. logical size keys)
|
||||
if self.range.end == self.range.start.add(1) {
|
||||
return vec![(
|
||||
if self.shard_identity.is_key_disposable(&self.range.start) {
|
||||
0
|
||||
} else {
|
||||
1
|
||||
},
|
||||
self.range,
|
||||
)];
|
||||
}
|
||||
|
||||
if !is_contiguous_range(&self.range) {
|
||||
// Ranges that span relations are not fragmented. We only get these ranges as a result
|
||||
// of operations that act on existing layers, so we trust that the existing range is
|
||||
// reasonably small.
|
||||
return vec![(u32::MAX, self.range)];
|
||||
}
|
||||
|
||||
let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
|
||||
|
||||
let mut cursor = self.range.start;
|
||||
while cursor < self.range.end {
|
||||
let advance_by = self.distance_to_next_boundary(cursor);
|
||||
let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
|
||||
|
||||
// If the previous fragment is undersized, then we seek to consume enough
|
||||
// blocks to complete it.
|
||||
let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
|
||||
Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
|
||||
Some(frag) => {
|
||||
// Prev block is complete, want the full number.
|
||||
(
|
||||
target_nblocks,
|
||||
if is_fragment_disposable {
|
||||
// If this current range will be empty (not shard-local data), we will merge into previous
|
||||
Some(frag)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
}
|
||||
None => {
|
||||
// First iteration, want the full number
|
||||
(target_nblocks, None)
|
||||
}
|
||||
};
|
||||
|
||||
let advance_by = if is_fragment_disposable {
|
||||
advance_by
|
||||
} else {
|
||||
std::cmp::min(advance_by, want_blocks)
|
||||
};
|
||||
|
||||
let next_cursor = cursor.add(advance_by);
|
||||
|
||||
let this_frag = (
|
||||
if is_fragment_disposable {
|
||||
0
|
||||
} else {
|
||||
advance_by
|
||||
},
|
||||
cursor..next_cursor,
|
||||
);
|
||||
cursor = next_cursor;
|
||||
|
||||
if let Some(last_fragment) = merge_last_fragment {
|
||||
// Previous fragment was short or this one is empty, merge into it
|
||||
last_fragment.0 += this_frag.0;
|
||||
last_fragment.1.end = this_frag.1.end;
|
||||
} else {
|
||||
fragments.push(this_frag);
|
||||
}
|
||||
}
|
||||
|
||||
fragments
|
||||
}
|
||||
|
||||
/// Estimate the physical pages that are within this range, on this shard. This returns
|
||||
/// u32::MAX if the range spans relations: this return value should be interpreted as "large".
|
||||
pub fn page_count(&self) -> u32 {
|
||||
// Special cases for single keys like logical sizes
|
||||
if self.range.end == self.range.start.add(1) {
|
||||
return if self.shard_identity.is_key_disposable(&self.range.start) {
|
||||
0
|
||||
} else {
|
||||
1
|
||||
};
|
||||
}
|
||||
|
||||
// We can only do an authentic calculation of contiguous key ranges
|
||||
if !is_contiguous_range(&self.range) {
|
||||
return u32::MAX;
|
||||
}
|
||||
|
||||
// Special case for single sharded tenants: our logical and physical sizes are the same
|
||||
if self.shard_identity.count < ShardCount::new(2) {
|
||||
return contiguous_range_len(&self.range);
|
||||
}
|
||||
|
||||
// Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
|
||||
// to Self, and add the stripe's block count to our total if so.
|
||||
let mut result: u64 = 0;
|
||||
let mut cursor = self.range.start;
|
||||
while cursor < self.range.end {
|
||||
// Count up to the next stripe_size boundary or end of range
|
||||
let advance_by = self.distance_to_next_boundary(cursor);
|
||||
|
||||
// If this blocks in this stripe belong to us, add them to our count
|
||||
if !self.shard_identity.is_key_disposable(&cursor) {
|
||||
result += advance_by as u64;
|
||||
}
|
||||
|
||||
cursor = cursor.add(advance_by);
|
||||
}
|
||||
|
||||
if result > u32::MAX as u64 {
|
||||
u32::MAX
|
||||
} else {
|
||||
result as u32
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the cursor to the next potential fragment boundary: this is either
|
||||
/// a stripe boundary, or the end of the range.
|
||||
fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
|
||||
let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
|
||||
|
||||
if self.shard_identity.count < ShardCount::new(2) {
|
||||
// Optimization: don't bother stepping through stripes if the tenant isn't sharded.
|
||||
return distance_to_range_end;
|
||||
}
|
||||
|
||||
if cursor.field6 == 0xffffffff {
|
||||
// We are wrapping from one relation's logical size to the next relation's first data block
|
||||
return 1;
|
||||
}
|
||||
|
||||
let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
|
||||
let stripe_remainder = self.shard_identity.stripe_size.0
|
||||
- (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
// We should never overflow field5 and field6 -- our callers check this earlier
|
||||
// and would have returned their u32::MAX cases if the input range violated this.
|
||||
let next_cursor = cursor.add(stripe_remainder);
|
||||
debug_assert!(
|
||||
next_cursor.field1 == cursor.field1
|
||||
&& next_cursor.field2 == cursor.field2
|
||||
&& next_cursor.field3 == cursor.field3
|
||||
&& next_cursor.field4 == cursor.field4
|
||||
&& next_cursor.field5 == cursor.field5
|
||||
)
|
||||
}
|
||||
|
||||
std::cmp::min(stripe_remainder, distance_to_range_end)
|
||||
}
|
||||
|
||||
/// Whereas `page_count` estimates the number of pages physically in this range on this shard,
|
||||
/// this function simply calculates the number of pages in the space, without accounting for those
|
||||
/// pages that would not actually be stored on this node.
|
||||
///
|
||||
/// Don't use this function in code that works with physical entities like layer files.
|
||||
fn raw_size(range: &Range<Key>) -> u32 {
|
||||
if is_contiguous_range(range) {
|
||||
contiguous_range_len(range)
|
||||
} else {
|
||||
u32::MAX
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeySpace {
|
||||
/// Create a key space with a single range.
|
||||
pub fn single(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
ranges: vec![key_range],
|
||||
}
|
||||
}
|
||||
|
||||
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
|
||||
/// in each partition.
|
||||
///
|
||||
pub fn partition(&self, target_size: u64) -> KeyPartitioning {
|
||||
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
|
||||
// Assume that each value is 8k in size.
|
||||
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
|
||||
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
|
||||
|
||||
let mut parts = Vec::new();
|
||||
let mut current_part = Vec::new();
|
||||
let mut current_part_size: usize = 0;
|
||||
for range in &self.ranges {
|
||||
// If appending the next contiguous range in the keyspace to the current
|
||||
// partition would cause it to be too large, start a new partition.
|
||||
let this_size = key_range_size(range) as usize;
|
||||
if current_part_size + this_size > target_nblocks && !current_part.is_empty() {
|
||||
parts.push(KeySpace {
|
||||
ranges: current_part,
|
||||
});
|
||||
current_part = Vec::new();
|
||||
current_part_size = 0;
|
||||
}
|
||||
// While doing partitioning, wrap the range in ShardedRange so that our size calculations
|
||||
// will respect shard striping rather than assuming all keys within a range are present.
|
||||
let range = ShardedRange::new(range.clone(), shard_identity);
|
||||
|
||||
// If the next range is larger than 'target_size', split it into
|
||||
// 'target_size' chunks.
|
||||
let mut remain_size = this_size;
|
||||
let mut start = range.start;
|
||||
while remain_size > target_nblocks {
|
||||
let next = start.add(target_nblocks as u32);
|
||||
parts.push(KeySpace {
|
||||
ranges: vec![start..next],
|
||||
});
|
||||
start = next;
|
||||
remain_size -= target_nblocks
|
||||
// Chunk up the range into parts that each contain up to target_size local blocks
|
||||
for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
|
||||
// If appending the next contiguous range in the keyspace to the current
|
||||
// partition would cause it to be too large, and our current partition
|
||||
// covers at least one block that is physically present in this shard,
|
||||
// then start a new partition
|
||||
if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
|
||||
&& current_part_size > 0
|
||||
{
|
||||
parts.push(KeySpace {
|
||||
ranges: current_part,
|
||||
});
|
||||
current_part = Vec::new();
|
||||
current_part_size = 0;
|
||||
}
|
||||
current_part.push(frag_range.start..frag_range.end);
|
||||
current_part_size += frag_on_shard_size as usize;
|
||||
}
|
||||
current_part.push(start..range.end);
|
||||
current_part_size += remain_size;
|
||||
}
|
||||
|
||||
// add last partition that wasn't full yet.
|
||||
@@ -64,6 +298,10 @@ impl KeySpace {
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.total_raw_size() == 0
|
||||
}
|
||||
|
||||
/// Merge another keyspace into the current one.
|
||||
/// Note: the keyspaces must not ovelap (enforced via assertions)
|
||||
pub fn merge(&mut self, other: &KeySpace) {
|
||||
@@ -154,18 +392,14 @@ impl KeySpace {
|
||||
self.ranges.last().map(|range| range.end)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn total_size(&self) -> usize {
|
||||
/// The size of the keyspace in pages, before accounting for sharding
|
||||
pub fn total_raw_size(&self) -> usize {
|
||||
self.ranges
|
||||
.iter()
|
||||
.map(|range| key_range_size(range) as usize)
|
||||
.map(|range| ShardedRange::raw_size(range) as usize)
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.total_size() == 0
|
||||
}
|
||||
|
||||
fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => None,
|
||||
@@ -236,7 +470,7 @@ impl KeySpaceAccum {
|
||||
|
||||
#[inline(always)]
|
||||
pub fn add_range(&mut self, range: Range<Key>) {
|
||||
self.size += key_range_size(&range) as u64;
|
||||
self.size += ShardedRange::raw_size(&range) as u64;
|
||||
|
||||
match self.accum.as_mut() {
|
||||
Some(accum) => {
|
||||
@@ -268,7 +502,9 @@ impl KeySpaceAccum {
|
||||
std::mem::take(self).to_keyspace()
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
// The total number of keys in this object, ignoring any sharding effects that might cause some of
|
||||
// the keys to be omitted in storage on this shard.
|
||||
pub fn raw_size(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
@@ -324,36 +560,19 @@ impl KeySpaceRandomAccum {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
|
||||
let start = key_range.start;
|
||||
let end = key_range.end;
|
||||
|
||||
if end.field1 != start.field1
|
||||
|| end.field2 != start.field2
|
||||
|| end.field3 != start.field3
|
||||
|| end.field4 != start.field4
|
||||
{
|
||||
return u32::MAX;
|
||||
}
|
||||
|
||||
let start = (start.field5 as u64) << 32 | start.field6 as u64;
|
||||
let end = (end.field5 as u64) << 32 | end.field6 as u64;
|
||||
|
||||
let diff = end - start;
|
||||
if diff > u32::MAX as u64 {
|
||||
u32::MAX
|
||||
} else {
|
||||
diff as u32
|
||||
}
|
||||
}
|
||||
|
||||
pub fn singleton_range(key: Key) -> Range<Key> {
|
||||
key..key.next()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rand::{RngCore, SeedableRng};
|
||||
|
||||
use crate::{
|
||||
models::ShardParameters,
|
||||
shard::{ShardCount, ShardNumber},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
@@ -396,14 +615,17 @@ mod tests {
|
||||
accum.add_range(range.clone());
|
||||
}
|
||||
|
||||
let expected_size: u64 = ranges.iter().map(|r| key_range_size(r) as u64).sum();
|
||||
assert_eq!(accum.size(), expected_size);
|
||||
let expected_size: u64 = ranges
|
||||
.iter()
|
||||
.map(|r| ShardedRange::raw_size(r) as u64)
|
||||
.sum();
|
||||
assert_eq!(accum.raw_size(), expected_size);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
|
||||
assert_eq!(accum.size(), 0);
|
||||
assert_eq!(accum.raw_size(), 0);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), vec![]);
|
||||
assert_eq!(accum.size(), 0);
|
||||
assert_eq!(accum.raw_size(), 0);
|
||||
|
||||
for range in &ranges {
|
||||
accum.add_range(range.clone());
|
||||
@@ -700,4 +922,412 @@ mod tests {
|
||||
]
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn sharded_range_relation_gap() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
|
||||
end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Key range spans relations, expect MAX
|
||||
assert_eq!(range.page_count(), u32::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_single_key() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(1),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
// Single-key range on logical size key
|
||||
assert_eq!(range.page_count(), 1);
|
||||
}
|
||||
|
||||
/// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
|
||||
#[test]
|
||||
fn contiguous_range_check() {
|
||||
assert!(!is_contiguous_range(
|
||||
&(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
|
||||
..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
|
||||
),);
|
||||
|
||||
// The ranges goes all the way up to the 0xffffffff, including it: this is
|
||||
// not considered a rel block range because 0xffffffff stores logical sizes,
|
||||
// not blocks.
|
||||
assert!(!is_contiguous_range(
|
||||
&(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
|
||||
..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
|
||||
),);
|
||||
|
||||
// Keys within the normal data region of a relation
|
||||
assert!(is_contiguous_range(
|
||||
&(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
|
||||
..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
|
||||
),);
|
||||
|
||||
// The logical size key of one forkno, then some blocks in the next
|
||||
assert!(is_contiguous_range(
|
||||
&(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
|
||||
..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_forkno_gap() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(1),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Range spanning the end of one forkno and the start of the next: we do not attempt to
|
||||
// calculate a valid size, because we have no way to know if they keys between start
|
||||
// and end are actually in use.
|
||||
assert_eq!(range.page_count(), u32::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_one_relation() {
|
||||
for shard_number in 0..4 {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(shard_number),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Very simple case: range covering block zero of one relation, where that block maps to shard zero
|
||||
if shard_number == 0 {
|
||||
assert_eq!(range.page_count(), 1);
|
||||
} else {
|
||||
// Other shards should perceive the range's size as zero
|
||||
assert_eq!(range.page_count(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test helper: construct a ShardedRange and call fragment() on it, returning
|
||||
/// the total page count in the range and the fragments.
|
||||
fn do_fragment(
|
||||
range_start: Key,
|
||||
range_end: Key,
|
||||
shard_identity: &ShardIdentity,
|
||||
target_nblocks: u32,
|
||||
) -> (u32, Vec<(u32, Range<Key>)>) {
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: range_start,
|
||||
end: range_end,
|
||||
},
|
||||
shard_identity,
|
||||
);
|
||||
|
||||
let page_count = range.page_count();
|
||||
let fragments = range.fragment(target_nblocks);
|
||||
|
||||
// Invariant: we always get at least one fragment
|
||||
assert!(!fragments.is_empty());
|
||||
|
||||
// Invariant: the first/last fragment start/end should equal the input start/end
|
||||
assert_eq!(fragments.first().unwrap().1.start, range_start);
|
||||
assert_eq!(fragments.last().unwrap().1.end, range_end);
|
||||
|
||||
if page_count > 0 {
|
||||
// Invariant: every fragment must contain at least one shard-local page, if the
|
||||
// total range contains at least one shard-local page
|
||||
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
|
||||
if !all_nonzero {
|
||||
eprintln!("Found a zero-length fragment: {:?}", fragments);
|
||||
}
|
||||
assert!(all_nonzero);
|
||||
} else {
|
||||
// A range with no shard-local pages should always be returned as a single fragment
|
||||
assert_eq!(fragments, vec![(0, range_start..range_end)]);
|
||||
}
|
||||
|
||||
// Invariant: fragments must be ordered and non-overlapping
|
||||
let mut last: Option<Range<Key>> = None;
|
||||
for frag in &fragments {
|
||||
if let Some(last) = last {
|
||||
assert!(frag.1.start >= last.end);
|
||||
assert!(frag.1.start > last.start);
|
||||
}
|
||||
last = Some(frag.1.clone())
|
||||
}
|
||||
|
||||
// Invariant: fragments respect target_nblocks
|
||||
for frag in &fragments {
|
||||
assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
|
||||
}
|
||||
|
||||
(page_count, fragments)
|
||||
}
|
||||
|
||||
/// Really simple tests for fragment(), on a range that just contains a single stripe
|
||||
/// for a single tenant.
|
||||
#[test]
|
||||
fn sharded_range_fragment_simple() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// A range which we happen to know covers exactly one stripe which belongs to this shard
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
|
||||
|
||||
// Ask for stripe_size blocks, we get the whole stripe
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 32768),
|
||||
(32768, vec![(32768, input_start..input_end)])
|
||||
);
|
||||
|
||||
// Ask for more, we still get the whole stripe
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 10000000),
|
||||
(32768, vec![(32768, input_start..input_end)])
|
||||
);
|
||||
|
||||
// Ask for target_nblocks of half the stripe size, we get two halves
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 16384),
|
||||
(
|
||||
32768,
|
||||
vec![
|
||||
(16384, input_start..input_start.add(16384)),
|
||||
(16384, input_start.add(16384)..input_end)
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_range_fragment_multi_stripe() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// A range which covers multiple stripes, exactly one of which belongs to the current shard.
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
|
||||
// Ask for all the blocks, get a fragment that covers the whole range but reports
|
||||
// its size to be just the blocks belonging to our shard.
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 131072),
|
||||
(32768, vec![(32768, input_start..input_end)])
|
||||
);
|
||||
|
||||
// Ask for a sub-stripe quantity
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 16000),
|
||||
(
|
||||
32768,
|
||||
vec![
|
||||
(16000, input_start..input_start.add(16000)),
|
||||
(16000, input_start.add(16000)..input_start.add(32000)),
|
||||
(768, input_start.add(32000)..input_end),
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
// Try on a range that starts slightly after our owned stripe
|
||||
assert_eq!(
|
||||
do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
|
||||
(32767, vec![(32767, input_start.add(1)..input_end)])
|
||||
);
|
||||
}
|
||||
|
||||
/// Test our calculations work correctly when we start a range from the logical size key of
|
||||
/// a previous relation.
|
||||
#[test]
|
||||
fn sharded_range_fragment_starting_from_logical_size() {
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
|
||||
|
||||
// Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 0x10000),
|
||||
(0x8001, vec![(0x8001, input_start..input_end)])
|
||||
);
|
||||
|
||||
// Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
|
||||
// store all logical sizes)
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(1),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 0x10000),
|
||||
(0x1, vec![(0x1, input_start..input_end)])
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that ShardedRange behaves properly when used on un-sharded data
|
||||
#[test]
|
||||
fn sharded_range_fragment_unsharded() {
|
||||
let shard_identity = ShardIdentity::unsharded();
|
||||
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 0x8000),
|
||||
(
|
||||
0x10000,
|
||||
vec![
|
||||
(0x8000, input_start..input_start.add(0x8000)),
|
||||
(0x8000, input_start.add(0x8000)..input_start.add(0x10000))
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_range_fragment_cross_relation() {
|
||||
let shard_identity = ShardIdentity::unsharded();
|
||||
|
||||
// A range that spans relations: expect fragmentation to give up and return a u32::MAX size
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 0x8000),
|
||||
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
|
||||
);
|
||||
|
||||
// Same, but using a sharded identity
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 0x8000),
|
||||
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_range_fragment_tiny_nblocks() {
|
||||
let shard_identity = ShardIdentity::unsharded();
|
||||
|
||||
// A range that spans relations: expect fragmentation to give up and return a u32::MAX size
|
||||
let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
|
||||
assert_eq!(
|
||||
do_fragment(input_start, input_end, &shard_identity, 16),
|
||||
(
|
||||
0x38,
|
||||
vec![
|
||||
(16, input_start..input_start.add(16)),
|
||||
(16, input_start.add(16)..input_start.add(32)),
|
||||
(16, input_start.add(32)..input_start.add(48)),
|
||||
(8, input_start.add(48)..input_end),
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_range_fragment_fuzz() {
|
||||
// Use a fixed seed: we don't want to explicitly pick values, but we do want
|
||||
// the test to be reproducible.
|
||||
let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
|
||||
|
||||
for _i in 0..1000 {
|
||||
let shard_identity = if prng.next_u32() % 2 == 0 {
|
||||
ShardIdentity::unsharded()
|
||||
} else {
|
||||
let shard_count = prng.next_u32() % 127 + 1;
|
||||
ShardIdentity::new(
|
||||
ShardNumber((prng.next_u32() % shard_count) as u8),
|
||||
ShardCount::new(shard_count as u8),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let target_nblocks = prng.next_u32() % 65536 + 1;
|
||||
|
||||
let start_offset = prng.next_u32() % 16384;
|
||||
|
||||
// Try ranges up to 4GiB in size, that are always at least 1
|
||||
let range_size = prng.next_u32() % 8192 + 1;
|
||||
|
||||
// A range that spans relations: expect fragmentation to give up and return a u32::MAX size
|
||||
let input_start = Key::from_hex("000000067F00000001000004E10000000000")
|
||||
.unwrap()
|
||||
.add(start_offset);
|
||||
let input_end = input_start.add(range_size);
|
||||
|
||||
// This test's main success conditions are the invariants baked into do_fragment
|
||||
let (_total_size, fragments) =
|
||||
do_fragment(input_start, input_end, &shard_identity, target_nblocks);
|
||||
|
||||
// Pick a random key within the range and check it appears in the output
|
||||
let example_key = input_start.add(prng.next_u32() % range_size);
|
||||
|
||||
// Panic on unwrap if it isn't found
|
||||
let example_key_frag = fragments
|
||||
.iter()
|
||||
.find(|f| f.1.contains(&example_key))
|
||||
.unwrap();
|
||||
|
||||
// Check that the fragment containing our random key has a nonzero size if
|
||||
// that key is shard-local
|
||||
let example_key_local = !shard_identity.is_key_disposable(&example_key);
|
||||
if example_key_local {
|
||||
assert!(example_key_frag.0 > 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,6 +303,7 @@ pub struct TenantConfig {
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub switch_to_aux_file_v2: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -781,6 +782,17 @@ pub struct SecondaryProgress {
|
||||
pub bytes_total: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantScanRemoteStorageShard {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub struct TenantScanRemoteStorageResponse {
|
||||
pub shards: Vec<TenantScanRemoteStorageShard>,
|
||||
}
|
||||
|
||||
pub mod virtual_file {
|
||||
#[derive(
|
||||
Copy,
|
||||
|
||||
@@ -451,7 +451,7 @@ impl ShardIdentity {
|
||||
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
|
||||
/// tenants. Modern single-shard tenants should not use this: they should
|
||||
/// have number=0 count=1.
|
||||
pub fn unsharded() -> Self {
|
||||
pub const fn unsharded() -> Self {
|
||||
Self {
|
||||
number: ShardNumber(0),
|
||||
count: ShardCount(0),
|
||||
@@ -538,24 +538,6 @@ impl ShardIdentity {
|
||||
}
|
||||
}
|
||||
|
||||
/// Special case for issue `<https://github.com/neondatabase/neon/issues/7451>`
|
||||
///
|
||||
/// When we fail to read a forknum block, this function tells us whether we may ignore the error
|
||||
/// as a symptom of that issue.
|
||||
pub fn is_key_buggy_forknum(&self, key: &Key) -> bool {
|
||||
if !is_rel_block_key(key) || key.field5 != INIT_FORKNUM {
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut hash = murmurhash32(key.field4);
|
||||
hash = hash_combine(hash, murmurhash32(key.field6 / self.stripe_size.0));
|
||||
let mapped_shard = ShardNumber((hash % self.count.0 as u32) as u8);
|
||||
|
||||
// The key may be affected by issue #7454: it is an initfork and it would not
|
||||
// have mapped to shard 0 until we fixed that issue.
|
||||
mapped_shard != ShardNumber(0)
|
||||
}
|
||||
|
||||
/// Return true if the key should be discarded if found in this shard's
|
||||
/// data store, e.g. during compaction after a split.
|
||||
///
|
||||
|
||||
@@ -78,18 +78,18 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
pub trait Handler<IO>: HandlerSync<IO> {
|
||||
#[async_trait::async_trait]
|
||||
pub trait Handler<IO> {
|
||||
/// Handle single query.
|
||||
/// postgres_backend will issue ReadyForQuery after calling this (this
|
||||
/// might be not what we want after CopyData streaming, but currently we don't
|
||||
/// care). It will also flush out the output buffer.
|
||||
fn process_query(
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> impl Future<Output = Result<(), QueryError>> + Send;
|
||||
}
|
||||
pub trait HandlerSync<IO> {
|
||||
) -> Result<(), QueryError>;
|
||||
|
||||
/// Called on startup packet receival, allows to process params.
|
||||
///
|
||||
/// If Ok(false) is returned postgres_backend will skip auth -- that is needed for new users
|
||||
|
||||
@@ -34,6 +34,8 @@ pub enum Generation {
|
||||
/// scenarios where pageservers might otherwise issue conflicting writes to
|
||||
/// remote storage
|
||||
impl Generation {
|
||||
pub const MAX: Self = Self::Valid(u32::MAX);
|
||||
|
||||
/// Create a new Generation that represents a legacy key format with
|
||||
/// no generation suffix
|
||||
pub fn none() -> Self {
|
||||
|
||||
@@ -243,6 +243,19 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantScanRemoteStorageResponse> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_id}/scan_remote_storage",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
let response = self.request(Method::GET, &uri, ()).await?;
|
||||
let body = response.json().await.map_err(Error::ReceiveBody)?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
|
||||
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, &uri, req).await?;
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//! database size. For example, if the logical database size is 10 GB, we would
|
||||
//! generate new image layers every 10 GB of WAL.
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
@@ -125,6 +126,7 @@ async fn compact_level<E: CompactionJobExecutor>(
|
||||
}
|
||||
|
||||
let mut state = LevelCompactionState {
|
||||
shard_identity: *executor.get_shard_identity(),
|
||||
target_file_size,
|
||||
_lsn_range: lsn_range.clone(),
|
||||
layers: layer_fragments,
|
||||
@@ -164,6 +166,8 @@ struct LevelCompactionState<'a, E>
|
||||
where
|
||||
E: CompactionJobExecutor,
|
||||
{
|
||||
shard_identity: ShardIdentity,
|
||||
|
||||
// parameters
|
||||
target_file_size: u64,
|
||||
|
||||
@@ -366,6 +370,7 @@ where
|
||||
.executor
|
||||
.get_keyspace(&job.key_range, job.lsn_range.end, ctx)
|
||||
.await?,
|
||||
&self.shard_identity,
|
||||
) * 8192;
|
||||
|
||||
let wal_size = job
|
||||
@@ -430,7 +435,7 @@ where
|
||||
keyspace,
|
||||
self.target_file_size / 8192,
|
||||
);
|
||||
while let Some(key_range) = window.choose_next_image() {
|
||||
while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
|
||||
new_jobs.push(CompactionJob::<E> {
|
||||
key_range,
|
||||
lsn_range: job.lsn_range.clone(),
|
||||
@@ -623,7 +628,12 @@ impl<K: CompactionKey> KeyspaceWindowPos<K> {
|
||||
}
|
||||
|
||||
// Advance the cursor until it reaches 'target_keysize'.
|
||||
fn advance_until_size(&mut self, w: &KeyspaceWindowHead<K>, max_size: u64) {
|
||||
fn advance_until_size(
|
||||
&mut self,
|
||||
w: &KeyspaceWindowHead<K>,
|
||||
max_size: u64,
|
||||
shard_identity: &ShardIdentity,
|
||||
) {
|
||||
while self.accum_keysize < max_size && !self.reached_end(w) {
|
||||
let curr_range = &w.keyspace[self.keyspace_idx];
|
||||
if self.end_key < curr_range.start {
|
||||
@@ -632,7 +642,7 @@ impl<K: CompactionKey> KeyspaceWindowPos<K> {
|
||||
}
|
||||
|
||||
// We're now within 'curr_range'. Can we advance past it completely?
|
||||
let distance = K::key_range_size(&(self.end_key..curr_range.end));
|
||||
let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
|
||||
if (self.accum_keysize + distance as u64) < max_size {
|
||||
// oh yeah, it fits
|
||||
self.end_key = curr_range.end;
|
||||
@@ -641,7 +651,7 @@ impl<K: CompactionKey> KeyspaceWindowPos<K> {
|
||||
} else {
|
||||
// advance within the range
|
||||
let skip_key = self.end_key.skip_some();
|
||||
let distance = K::key_range_size(&(self.end_key..skip_key));
|
||||
let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
|
||||
if (self.accum_keysize + distance as u64) < max_size {
|
||||
self.end_key = skip_key;
|
||||
self.accum_keysize += distance as u64;
|
||||
@@ -677,7 +687,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn choose_next_image(&mut self) -> Option<Range<K>> {
|
||||
fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
|
||||
if self.start_pos.keyspace_idx == self.head.keyspace.len() {
|
||||
// we've reached the end
|
||||
return None;
|
||||
@@ -687,6 +697,7 @@ where
|
||||
next_pos.advance_until_size(
|
||||
&self.head,
|
||||
self.start_pos.accum_keysize + self.head.target_keysize,
|
||||
shard_identity,
|
||||
);
|
||||
|
||||
// See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
|
||||
@@ -695,6 +706,7 @@ where
|
||||
end_pos.advance_until_size(
|
||||
&self.head,
|
||||
self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
|
||||
shard_identity,
|
||||
);
|
||||
if end_pos.reached_end(&self.head) {
|
||||
// gobble up any unused keyspace between the last used key and end of the range
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::interface::*;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::VecDeque;
|
||||
@@ -13,11 +14,17 @@ use std::ops::{DerefMut, Range};
|
||||
use std::pin::Pin;
|
||||
use std::task::{ready, Poll};
|
||||
|
||||
pub fn keyspace_total_size<K>(keyspace: &CompactionKeySpace<K>) -> u64
|
||||
pub fn keyspace_total_size<K>(
|
||||
keyspace: &CompactionKeySpace<K>,
|
||||
shard_identity: &ShardIdentity,
|
||||
) -> u64
|
||||
where
|
||||
K: CompactionKey,
|
||||
{
|
||||
keyspace.iter().map(|r| K::key_range_size(r) as u64).sum()
|
||||
keyspace
|
||||
.iter()
|
||||
.map(|r| K::key_range_size(r, shard_identity) as u64)
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! All the heavy lifting is done by the create_image and create_delta
|
||||
//! functions that the implementor provides.
|
||||
use futures::Future;
|
||||
use pageserver_api::{key::Key, keyspace::key_range_size};
|
||||
use pageserver_api::{key::Key, keyspace::ShardedRange, shard::ShardIdentity};
|
||||
use std::ops::Range;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -32,6 +32,8 @@ pub trait CompactionJobExecutor {
|
||||
// Functions that the planner uses to support its decisions
|
||||
// ----
|
||||
|
||||
fn get_shard_identity(&self) -> &ShardIdentity;
|
||||
|
||||
/// Return all layers that overlap the given bounding box.
|
||||
fn get_layers(
|
||||
&mut self,
|
||||
@@ -98,7 +100,7 @@ pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
|
||||
///
|
||||
/// This returns u32, for compatibility with Repository::key. If the
|
||||
/// distance is larger, return u32::MAX.
|
||||
fn key_range_size(key_range: &Range<Self>) -> u32;
|
||||
fn key_range_size(key_range: &Range<Self>, shard_identity: &ShardIdentity) -> u32;
|
||||
|
||||
// return "self + 1"
|
||||
fn next(&self) -> Self;
|
||||
@@ -113,8 +115,8 @@ impl CompactionKey for Key {
|
||||
const MIN: Self = Self::MIN;
|
||||
const MAX: Self = Self::MAX;
|
||||
|
||||
fn key_range_size(r: &std::ops::Range<Self>) -> u32 {
|
||||
key_range_size(r)
|
||||
fn key_range_size(r: &std::ops::Range<Self>, shard_identity: &ShardIdentity) -> u32 {
|
||||
ShardedRange::new(r.clone(), shard_identity).page_count()
|
||||
}
|
||||
fn next(&self) -> Key {
|
||||
(self as &Key).next()
|
||||
|
||||
@@ -3,6 +3,7 @@ mod draw;
|
||||
use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
|
||||
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use rand::Rng;
|
||||
use tracing::info;
|
||||
|
||||
@@ -71,7 +72,7 @@ impl interface::CompactionKey for Key {
|
||||
const MIN: Self = u64::MIN;
|
||||
const MAX: Self = u64::MAX;
|
||||
|
||||
fn key_range_size(key_range: &Range<Self>) -> u32 {
|
||||
fn key_range_size(key_range: &Range<Self>, _shard_identity: &ShardIdentity) -> u32 {
|
||||
std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
@@ -434,6 +435,11 @@ impl interface::CompactionJobExecutor for MockTimeline {
|
||||
type ImageLayer = Arc<MockImageLayer>;
|
||||
type RequestContext = MockRequestContext;
|
||||
|
||||
fn get_shard_identity(&self) -> &ShardIdentity {
|
||||
static IDENTITY: ShardIdentity = ShardIdentity::unsharded();
|
||||
&IDENTITY
|
||||
}
|
||||
|
||||
async fn get_layers(
|
||||
&mut self,
|
||||
key_range: &Range<Self::Key>,
|
||||
|
||||
@@ -85,27 +85,27 @@ mod tests {
|
||||
// To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
|
||||
// of the page server.
|
||||
assert_eq!(
|
||||
"8200000101E5B20C5F8DD5AA3289D6D9EAFA",
|
||||
"6200000101E5B20C5F8DD5AA3289D6D9EAFA",
|
||||
encode_aux_file_key("pg_logical/mappings/test1").to_string()
|
||||
);
|
||||
assert_eq!(
|
||||
"820000010239AAC544893139B26F501B97E6",
|
||||
"620000010239AAC544893139B26F501B97E6",
|
||||
encode_aux_file_key("pg_logical/snapshots/test2").to_string()
|
||||
);
|
||||
assert_eq!(
|
||||
"820000010300000000000000000000000000",
|
||||
"620000010300000000000000000000000000",
|
||||
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string()
|
||||
);
|
||||
assert_eq!(
|
||||
"82000001FF8635AF2134B7266EC5B4189FD6",
|
||||
"62000001FF8635AF2134B7266EC5B4189FD6",
|
||||
encode_aux_file_key("pg_logical/unsupported").to_string()
|
||||
);
|
||||
assert_eq!(
|
||||
"8200000201772D0E5D71DE14DA86142A1619",
|
||||
"6200000201772D0E5D71DE14DA86142A1619",
|
||||
encode_aux_file_key("pg_replslot/test3").to_string()
|
||||
);
|
||||
assert_eq!(
|
||||
"820000FFFF1866EBEB53B807B26A2416F317",
|
||||
"620000FFFF1866EBEB53B807B26A2416F317",
|
||||
encode_aux_file_key("other_file_not_supported").to_string()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{key_to_slru_block, rel_block_to_key, Key};
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
@@ -263,7 +263,10 @@ where
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
@@ -297,20 +300,7 @@ where
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
if let Err(_e) = self.add_rel(rel, rel).await {
|
||||
if self
|
||||
.timeline
|
||||
.get_shard_identity()
|
||||
.is_key_buggy_forknum(&rel_block_to_key(rel, 0x0))
|
||||
{
|
||||
// Workaround https://github.com/neondatabase/neon/issues/7451 -- if we have an unlogged relation
|
||||
// whose INIT_FORKNUM is not correctly on shard zero, then omit it in the basebackup. This allows
|
||||
// postgres to start up. The relation won't work, but it will be possible to DROP TABLE on it and
|
||||
// recreate.
|
||||
tracing::warn!("Omitting relation {rel} for issue #7451: drop and recreate this unlogged relation");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigResponse;
|
||||
use pageserver_api::models::TenantScanRemoteStorageResponse;
|
||||
use pageserver_api::models::TenantScanRemoteStorageShard;
|
||||
use pageserver_api::models::TenantShardLocation;
|
||||
use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
@@ -29,6 +31,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::ShardCount;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeTravelError;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
@@ -54,6 +57,9 @@ use crate::tenant::mgr::{
|
||||
};
|
||||
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
|
||||
use crate::tenant::remote_timeline_client;
|
||||
use crate::tenant::remote_timeline_client::download_index_part;
|
||||
use crate::tenant::remote_timeline_client::list_remote_tenant_shards;
|
||||
use crate::tenant::remote_timeline_client::list_remote_timelines;
|
||||
use crate::tenant::secondary::SecondaryController;
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
@@ -2035,6 +2041,79 @@ async fn secondary_upload_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_scan_remote_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&request);
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
|
||||
let Some(remote_storage) = state.remote_storage.as_ref() else {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Remote storage not configured"
|
||||
)));
|
||||
};
|
||||
|
||||
let mut response = TenantScanRemoteStorageResponse::default();
|
||||
|
||||
let (shards, _other_keys) =
|
||||
list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone())
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
|
||||
for tenant_shard_id in shards {
|
||||
let (timeline_ids, _other_keys) =
|
||||
list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone())
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
|
||||
let mut generation = Generation::none();
|
||||
for timeline_id in timeline_ids {
|
||||
match download_index_part(
|
||||
remote_storage,
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
Generation::MAX,
|
||||
&cancel,
|
||||
)
|
||||
.instrument(info_span!("download_index_part",
|
||||
tenant_id=%tenant_shard_id.tenant_id,
|
||||
shard_id=%tenant_shard_id.shard_slug(),
|
||||
%timeline_id))
|
||||
.await
|
||||
{
|
||||
Ok((index_part, index_generation)) => {
|
||||
tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
|
||||
index_part.layer_metadata.len(), index_part.get_disk_consistent_lsn());
|
||||
generation = std::cmp::max(generation, index_generation);
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
// This is normal for tenants that were created with multiple shards: they have an unsharded path
|
||||
// containing the timeline's initdb tarball but no index. Otherwise it is a bit strange.
|
||||
tracing::info!("Timeline path {tenant_shard_id}/{timeline_id} exists in remote storage but has no index, skipping");
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
response.shards.push(TenantScanRemoteStorageShard {
|
||||
tenant_shard_id,
|
||||
generation: generation.into(),
|
||||
});
|
||||
}
|
||||
|
||||
if response.shards.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("No shards found for tenant ID {tenant_id}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn secondary_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2431,6 +2510,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
|
||||
api_handler(r, secondary_upload_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/scan_remote_storage", |r| {
|
||||
api_handler(r, tenant_scan_remote_handler)
|
||||
})
|
||||
.put("/v1/disk_usage_eviction/run", |r| {
|
||||
api_handler(r, disk_usage_eviction_run)
|
||||
})
|
||||
|
||||
@@ -51,6 +51,9 @@ pub(crate) enum StorageTimeOperation {
|
||||
#[strum(serialize = "gc")]
|
||||
Gc,
|
||||
|
||||
#[strum(serialize = "update gc info")]
|
||||
UpdateGcInfo,
|
||||
|
||||
#[strum(serialize = "create tenant")]
|
||||
CreateTenant,
|
||||
}
|
||||
@@ -1910,6 +1913,22 @@ impl StorageTimeMetricsTimer {
|
||||
self.metrics.timeline_count.inc();
|
||||
self.metrics.global_histogram.observe(duration);
|
||||
}
|
||||
|
||||
/// Turns this timer into a timer, which will always record -- usually this means recording
|
||||
/// regardless an early `?` path was taken in a function.
|
||||
pub(crate) fn record_on_drop(self) -> AlwaysRecordingStorageTimeMetricsTimer {
|
||||
AlwaysRecordingStorageTimeMetricsTimer(Some(self))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AlwaysRecordingStorageTimeMetricsTimer(Option<StorageTimeMetricsTimer>);
|
||||
|
||||
impl Drop for AlwaysRecordingStorageTimeMetricsTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.0.take() {
|
||||
inner.stop_and_record();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timing facilities for an globally histogrammed metric, which is supported by per tenant and
|
||||
@@ -1970,6 +1989,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub imitate_logical_size_histo: StorageTimeMetrics,
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub update_gc_info_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
resident_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
@@ -2030,6 +2050,12 @@ impl TimelineMetrics {
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let update_gc_info_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::UpdateGcInfo,
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let last_record_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -2072,6 +2098,7 @@ impl TimelineMetrics {
|
||||
logical_size_histo,
|
||||
imitate_logical_size_histo,
|
||||
garbage_collect_histo,
|
||||
update_gc_info_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_gauge,
|
||||
resident_physical_size_gauge,
|
||||
|
||||
@@ -1368,7 +1368,8 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl<IO> postgres_backend::HandlerSync<IO> for PageServerHandler
|
||||
#[async_trait::async_trait]
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
@@ -1408,24 +1409,9 @@ where
|
||||
) -> Result<(), QueryError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type IO<'s> = std::pin::Pin<&'s mut tokio_io_timeout::TimeoutReader<tokio::net::TcpStream>>;
|
||||
|
||||
impl<'s> postgres_backend::Handler<IO<'s>> for PageServerHandler
|
||||
{
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id))]
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO<'s>>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
self.process_query_(pgb, &query_string).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Sync + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
|
||||
@@ -445,11 +445,6 @@ impl Timeline {
|
||||
// include physical changes from later commits that will be marked
|
||||
// as aborted, and will need to be vacuumed away.
|
||||
let commit_lsn = Lsn((low - 1) * 8);
|
||||
// This maxing operation is for the edge case that the search above did
|
||||
// set found_smaller to true but it never increased the lsn. Then, low
|
||||
// is still the old min_lsn the subtraction above could possibly give a value
|
||||
// below the anchestor_lsn.
|
||||
let commit_lsn = commit_lsn.max(min_lsn);
|
||||
match (found_smaller, found_larger) {
|
||||
(false, false) => {
|
||||
// This can happen if no commit records have been processed yet, e.g.
|
||||
@@ -460,6 +455,12 @@ impl Timeline {
|
||||
// Didn't find any commit timestamps smaller than the request
|
||||
Ok(LsnForTimestamp::Past(min_lsn))
|
||||
}
|
||||
(true, _) if commit_lsn < min_lsn => {
|
||||
// the search above did set found_smaller to true but it never increased the lsn.
|
||||
// Then, low is still the old min_lsn, and the subtraction above gave a value
|
||||
// below the min_lsn. We should never do that.
|
||||
Ok(LsnForTimestamp::Past(min_lsn))
|
||||
}
|
||||
(true, false) => {
|
||||
// Only found commits with timestamps smaller than the request.
|
||||
// It's still a valid case for branch creation, return it.
|
||||
|
||||
@@ -361,6 +361,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
EphemeralFilePreWarmPageCache,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -888,7 +888,7 @@ impl Tenant {
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn preload(
|
||||
self: &Arc<Tenant>,
|
||||
self: &Arc<Self>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<TenantPreload> {
|
||||
@@ -918,9 +918,13 @@ impl Tenant {
|
||||
|
||||
Ok(TenantPreload {
|
||||
deleting,
|
||||
timelines: self
|
||||
.load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
|
||||
.await?,
|
||||
timelines: Self::load_timeline_metadata(
|
||||
self,
|
||||
remote_timeline_ids,
|
||||
remote_storage,
|
||||
cancel,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3402,7 +3406,11 @@ impl Tenant {
|
||||
// is in progress (which is not a common case).
|
||||
//
|
||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||
let mut shared_cache = self.cached_logical_sizes.lock().await;
|
||||
let mut shared_cache = tokio::select! {
|
||||
locked = self.cached_logical_sizes.lock() => locked,
|
||||
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
|
||||
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
|
||||
};
|
||||
|
||||
size::gather_inputs(
|
||||
self,
|
||||
@@ -3664,6 +3672,7 @@ pub(crate) mod harness {
|
||||
image_layer_creation_check_threshold: Some(
|
||||
tenant_conf.image_layer_creation_check_threshold,
|
||||
),
|
||||
switch_to_aux_file_v2: Some(tenant_conf.switch_to_aux_file_v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
self.offset
|
||||
}
|
||||
|
||||
const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
|
||||
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
|
||||
|
||||
/// Writes the given buffer directly to the underlying `VirtualFile`.
|
||||
/// You need to make sure that the internal buffer is empty, otherwise
|
||||
|
||||
@@ -369,6 +369,10 @@ pub struct TenantConf {
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Switch to aux file v2. Switching this flag requires the user has not written any aux file into
|
||||
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
|
||||
pub switch_to_aux_file_v2: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -464,6 +468,10 @@ pub struct TenantConfOpt {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub switch_to_aux_file_v2: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -521,6 +529,9 @@ impl TenantConfOpt {
|
||||
image_layer_creation_check_threshold: self
|
||||
.image_layer_creation_check_threshold
|
||||
.unwrap_or(global_conf.image_layer_creation_check_threshold),
|
||||
switch_to_aux_file_v2: self
|
||||
.switch_to_aux_file_v2
|
||||
.unwrap_or(global_conf.switch_to_aux_file_v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,6 +573,7 @@ impl Default for TenantConf {
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_to_aux_file_v2: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -636,6 +648,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
switch_to_aux_file_v2: value.switch_to_aux_file_v2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,36 +3,26 @@
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tracing::*;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
file: VirtualFile,
|
||||
len: u64,
|
||||
/// An ephemeral file is append-only.
|
||||
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
|
||||
/// The other pages, which can no longer be modified, are accessed through the page cache.
|
||||
///
|
||||
/// None <=> IO is ongoing.
|
||||
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
|
||||
mutable_tail: Option<BytesMut>,
|
||||
|
||||
rw: page_caching::RW,
|
||||
}
|
||||
|
||||
mod page_caching;
|
||||
mod zero_padded_read_write;
|
||||
|
||||
impl EphemeralFile {
|
||||
pub async fn create(
|
||||
conf: &PageServerConf,
|
||||
@@ -59,21 +49,18 @@ impl EphemeralFile {
|
||||
.await?;
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
len: 0,
|
||||
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
|
||||
rw: page_caching::RW::new(file),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> u64 {
|
||||
self.len
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) fn id(&self) -> page_cache::FileId {
|
||||
self.page_cache_file_id
|
||||
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
self.rw.page_cache_file_id()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -81,182 +68,30 @@ impl EphemeralFile {
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = self
|
||||
.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
self.mutable_tail
|
||||
.as_deref()
|
||||
.expect("we're not doing IO, it must be Some()")
|
||||
.try_into()
|
||||
.expect("we ensure that it's always PAGE_SZ"),
|
||||
))
|
||||
}
|
||||
self.rw.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_blob(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<u64, io::Error> {
|
||||
struct Writer<'a> {
|
||||
ephemeral_file: &'a mut EphemeralFile,
|
||||
/// The block to which the next [`push_bytes`] will write.
|
||||
blknum: u32,
|
||||
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
|
||||
off: usize,
|
||||
}
|
||||
impl<'a> Writer<'a> {
|
||||
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
|
||||
Ok(Writer {
|
||||
blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
|
||||
off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
|
||||
ephemeral_file,
|
||||
})
|
||||
}
|
||||
#[inline(always)]
|
||||
async fn push_bytes(
|
||||
&mut self,
|
||||
src: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), io::Error> {
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
let dst_remaining = &mut self
|
||||
.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not yet ongoing")[self.off..];
|
||||
let n = min(dst_remaining.len(), src_remaining.len());
|
||||
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
|
||||
self.off += n;
|
||||
src_remaining = &src_remaining[n..];
|
||||
if self.off == PAGE_SZ {
|
||||
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
|
||||
.expect("IO is not yet ongoing");
|
||||
let (mutable_tail, res) = self
|
||||
.ephemeral_file
|
||||
.file
|
||||
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
|
||||
.await;
|
||||
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
|
||||
// I.e., the IO isn't retryable if we panic.
|
||||
self.ephemeral_file.mutable_tail = Some(mutable_tail);
|
||||
match res {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref()
|
||||
.expect("IO is not ongoing"),
|
||||
);
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
}
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not ongoing")
|
||||
.fill(0);
|
||||
// This block is done, move to next one.
|
||||
self.blknum += 1;
|
||||
self.off = 0;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::Other,
|
||||
// order error before path because path is long and error is short
|
||||
format!(
|
||||
"ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
|
||||
self.blknum,
|
||||
e,
|
||||
self.ephemeral_file.file.path,
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let pos = self.len;
|
||||
let mut writer = Writer::new(self)?;
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the length field
|
||||
if srcbuf.len() < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [srcbuf.len() as u8];
|
||||
writer.push_bytes(&len_buf, ctx).await?;
|
||||
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
writer.push_bytes(&len_buf, ctx).await?;
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
}
|
||||
|
||||
// Write the payload
|
||||
writer.push_bytes(srcbuf, ctx).await?;
|
||||
|
||||
if srcbuf.len() < 0x80 {
|
||||
self.len += 1;
|
||||
} else {
|
||||
self.len += 4;
|
||||
}
|
||||
self.len += srcbuf.len() as u64;
|
||||
self.rw.write_all_borrowed(srcbuf).await?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
@@ -271,28 +106,6 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.path, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockReader for EphemeralFile {
|
||||
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
|
||||
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
|
||||
|
||||
218
pageserver/src/tenant/ephemeral_file/page_caching.rs
Normal file
218
pageserver/src/tenant/ephemeral_file/page_caching.rs
Normal file
@@ -0,0 +1,218 @@
|
||||
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
|
||||
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::BlockLease;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::io::{self, ErrorKind};
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
use tracing::*;
|
||||
|
||||
use super::zero_padded_read_write;
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
|
||||
}
|
||||
|
||||
impl RW {
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
let page_cache_file_id = page_cache::next_file_id();
|
||||
Self {
|
||||
page_cache_file_id,
|
||||
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
|
||||
page_cache_file_id,
|
||||
file,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
self.page_cache_file_id
|
||||
}
|
||||
|
||||
pub(crate) async fn write_all_borrowed(&mut self, srcbuf: &[u8]) -> Result<usize, io::Error> {
|
||||
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
|
||||
// because Compute is unlikely to access recently written data.
|
||||
self.rw.write_all_borrowed(srcbuf).await
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_written(&self) -> u64 {
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
match self.rw.read_blk(blknum).await? {
|
||||
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.rw.as_writer().file.path,
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = writer
|
||||
.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
|
||||
Ok(BlockLease::EphemeralFileMutableTail(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RW {
|
||||
fn drop(&mut self) {
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.rw.as_writer().file.path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PreWarmingWriter {
|
||||
nwritten_blocks: u32,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
file: VirtualFile,
|
||||
}
|
||||
|
||||
impl PreWarmingWriter {
|
||||
fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self {
|
||||
Self {
|
||||
nwritten_blocks: 0,
|
||||
page_cache_file_id,
|
||||
file,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
|
||||
async fn write_all<
|
||||
B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
|
||||
Buf: tokio_epoll_uring::IoBuf + Send,
|
||||
>(
|
||||
&mut self,
|
||||
buf: B,
|
||||
) -> std::io::Result<(usize, B::Buf)> {
|
||||
let buf = buf.slice(..);
|
||||
let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
|
||||
let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
|
||||
Some(buf.to_vec())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let buflen = buf.len();
|
||||
assert_eq!(
|
||||
buflen % PAGE_SZ,
|
||||
0,
|
||||
"{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
|
||||
);
|
||||
|
||||
// Do the IO.
|
||||
let iobuf = match self.file.write_all(buf).await {
|
||||
(iobuf, Ok(nwritten)) => {
|
||||
assert_eq!(nwritten, buflen);
|
||||
iobuf
|
||||
}
|
||||
(_, Err(e)) => {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::Other,
|
||||
// order error before path because path is long and error is short
|
||||
format!(
|
||||
"ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
|
||||
self.nwritten_blocks, buflen, e, self.file.path,
|
||||
),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
|
||||
let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
|
||||
if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
|
||||
assert_eq!(&check_bounds_stuff_works, &*buf);
|
||||
}
|
||||
|
||||
// Pre-warm page cache with the contents.
|
||||
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
|
||||
// benefits the code that writes InMemoryLayer=>L0 layers.
|
||||
let nblocks = buflen / PAGE_SZ;
|
||||
let nblocks32 = u32::try_from(nblocks).unwrap();
|
||||
let cache = page_cache::get();
|
||||
static CTX: Lazy<RequestContext> = Lazy::new(|| {
|
||||
RequestContext::new(
|
||||
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
|
||||
crate::context::DownloadBehavior::Error,
|
||||
)
|
||||
});
|
||||
for blknum_in_buffer in 0..nblocks {
|
||||
let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
|
||||
let blknum = self
|
||||
.nwritten_blocks
|
||||
.checked_add(blknum_in_buffer as u32)
|
||||
.unwrap();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
Ok(v) => match v {
|
||||
page_cache::ReadBufResult::Found(_guard) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
|
||||
and this function takes &mut self, so, no concurrent read_blk is possible");
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(blk_in_buffer);
|
||||
let _ = write_guard.mark_valid();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
|
||||
Ok((buflen, buf.into_inner()))
|
||||
}
|
||||
}
|
||||
125
pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs
Normal file
125
pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
//! The heart of how [`super::EphemeralFile`] does its reads and writes.
|
||||
//!
|
||||
//! # Writes
|
||||
//!
|
||||
//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
|
||||
//! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
|
||||
//!
|
||||
//! # Reads
|
||||
//!
|
||||
//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
|
||||
//!
|
||||
//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
|
||||
//! or redirects the caller to read from the underlying [`OwnedAsyncWriter`]
|
||||
//! if the read is for the prefix that has already been flushed.
|
||||
//!
|
||||
//! # Current Usage
|
||||
//!
|
||||
//! The current user of this module is [`super::page_caching::RW`].
|
||||
|
||||
mod zero_padded;
|
||||
|
||||
use crate::{
|
||||
page_cache::PAGE_SZ,
|
||||
virtual_file::owned_buffers_io::{
|
||||
self,
|
||||
write::{Buffer, OwnedAsyncWriter},
|
||||
},
|
||||
};
|
||||
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct RW<W: OwnedAsyncWriter> {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<
|
||||
zero_padded::Buffer<TAIL_SZ>,
|
||||
owned_buffers_io::util::size_tracking_writer::Writer<W>,
|
||||
>,
|
||||
}
|
||||
|
||||
pub enum ReadResult<'a, W> {
|
||||
NeedsReadFromWriter { writer: &'a W },
|
||||
ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
|
||||
}
|
||||
|
||||
impl<W> RW<W>
|
||||
where
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
pub fn new(writer: W) -> Self {
|
||||
let bytes_flushed_tracker =
|
||||
owned_buffers_io::util::size_tracking_writer::Writer::new(writer);
|
||||
let buffered_writer = owned_buffers_io::write::BufferedWriter::new(
|
||||
bytes_flushed_tracker,
|
||||
zero_padded::Buffer::default(),
|
||||
);
|
||||
Self { buffered_writer }
|
||||
}
|
||||
|
||||
pub(crate) fn as_writer(&self) -> &W {
|
||||
self.buffered_writer.as_inner().as_inner()
|
||||
}
|
||||
|
||||
pub async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.buffered_writer.write_buffered_borrowed(buf).await
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
|
||||
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
|
||||
flushed_offset + u64::try_from(buffer.pending()).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
|
||||
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
|
||||
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
|
||||
let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
|
||||
// The trailing page ("block") might only be partially filled,
|
||||
// yet the blob_io code relies on us to return a full PAGE_SZed slice anyway.
|
||||
// Moreover, it has to be zero-padded, because when we still had
|
||||
// a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it.
|
||||
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
|
||||
// => check here that the read doesn't go beyond this potentially trailing
|
||||
// => the zero-padding is done in the `else` branch below
|
||||
let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 {
|
||||
buffered_offset / (PAGE_SZ as u64)
|
||||
} else {
|
||||
(buffered_offset / (PAGE_SZ as u64)) + 1
|
||||
};
|
||||
if (blknum as u64) >= blocks_written {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}")));
|
||||
}
|
||||
|
||||
// assertions for the `if-else` below
|
||||
assert_eq!(
|
||||
flushed_offset % (TAIL_SZ as u64), 0,
|
||||
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
|
||||
);
|
||||
assert_eq!(
|
||||
flushed_offset % (PAGE_SZ as u64),
|
||||
0,
|
||||
"the logic below can't handle if the page is spread across the flushed part and the buffer"
|
||||
);
|
||||
|
||||
if read_offset < flushed_offset {
|
||||
assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset);
|
||||
Ok(ReadResult::NeedsReadFromWriter {
|
||||
writer: self.as_writer(),
|
||||
})
|
||||
} else {
|
||||
let read_offset_in_buffer = read_offset
|
||||
.checked_sub(flushed_offset)
|
||||
.expect("would have taken `if` branch instead of this one");
|
||||
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
|
||||
let zero_padded_slice = buffer.as_zero_padded_slice();
|
||||
let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
|
||||
Ok(ReadResult::ServedFromZeroPaddedMutableTail {
|
||||
buffer: page
|
||||
.try_into()
|
||||
.expect("the slice above got it as page-size slice"),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
//! A [`crate::virtual_file::owned_buffers_io::write::Buffer`] whose
|
||||
//! unwritten range is guaranteed to be zero-initialized.
|
||||
//! This is used by [`crate::tenant::ephemeral_file::zero_padded_read_write::RW::read_blk`]
|
||||
//! to serve page-sized reads of the trailing page when the trailing page has only been partially filled.
|
||||
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct Buffer<const N: usize> {
|
||||
allocation: Box<[u8; N]>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl<const N: usize> Default for Buffer<N> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
allocation: Box::new(
|
||||
// SAFETY: zeroed memory is a valid [u8; N]
|
||||
unsafe { MaybeUninit::zeroed().assume_init() },
|
||||
),
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> Buffer<N> {
|
||||
#[inline(always)]
|
||||
fn invariants(&self) {
|
||||
// don't check by default, unoptimized is too expensive even for debug mode
|
||||
if false {
|
||||
debug_assert!(self.written <= N, "{}", self.written);
|
||||
debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
|
||||
&self.allocation
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Buffer<N> {
|
||||
type IoBuf = Self;
|
||||
|
||||
fn cap(&self) -> usize {
|
||||
self.allocation.len()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
self.invariants();
|
||||
let remaining = self.allocation.len() - self.written;
|
||||
if other.len() > remaining {
|
||||
panic!("calling extend_from_slice() with insufficient remaining capacity");
|
||||
}
|
||||
self.allocation[self.written..(self.written + other.len())].copy_from_slice(other);
|
||||
self.written += other.len();
|
||||
self.invariants();
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
fn flush(self) -> tokio_epoll_uring::Slice<Self> {
|
||||
self.invariants();
|
||||
let written = self.written;
|
||||
tokio_epoll_uring::BoundedBuf::slice(self, 0..written)
|
||||
}
|
||||
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
|
||||
let Self {
|
||||
mut allocation,
|
||||
written,
|
||||
} = iobuf;
|
||||
allocation[0..written].fill(0);
|
||||
let new = Self {
|
||||
allocation,
|
||||
written: 0,
|
||||
};
|
||||
new.invariants();
|
||||
new
|
||||
}
|
||||
}
|
||||
|
||||
/// We have this trait impl so that the `flush` method in the `Buffer` impl above can produce a
|
||||
/// [`tokio_epoll_uring::BoundedBuf::slice`] of the [`Self::written`] range of the data.
|
||||
///
|
||||
/// Remember that bytes_init is generally _not_ a tracker of the amount
|
||||
/// of valid data in the io buffer; we use `Slice` for that.
|
||||
/// The `IoBuf` is _only_ for keeping track of uninitialized memory, a bit like MaybeUninit.
|
||||
///
|
||||
/// SAFETY:
|
||||
///
|
||||
/// The [`Self::allocation`] is stable becauses boxes are stable.
|
||||
/// The memory is zero-initialized, so, bytes_init is always N.
|
||||
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buffer<N> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.allocation.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
// Yes, N, not self.written; Read the full comment of this impl block!
|
||||
N
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
N
|
||||
}
|
||||
}
|
||||
@@ -243,7 +243,9 @@ use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
|
||||
pub(crate) use download::{
|
||||
download_index_part, is_temp_download_file, list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
@@ -472,7 +474,7 @@ impl RemoteTimelineClient {
|
||||
},
|
||||
);
|
||||
|
||||
let index_part = download::download_index_part(
|
||||
let (index_part, _index_generation) = download::download_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
@@ -1716,6 +1718,11 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_shard_id}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::fs::{self, File, OpenOptions};
|
||||
@@ -26,13 +26,13 @@ use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::TimelineId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::{
|
||||
parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
|
||||
INITDB_PATH,
|
||||
remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -254,42 +254,31 @@ pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// List timelines of given tenant in remote storage
|
||||
pub async fn list_remote_timelines(
|
||||
async fn list_identifiers<T>(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: TenantShardId,
|
||||
prefix: RemotePath,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
|
||||
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
|
||||
where
|
||||
T: FromStr + Eq + std::hash::Hash,
|
||||
{
|
||||
let listing = download_retry_forever(
|
||||
|| {
|
||||
storage.list(
|
||||
Some(&remote_path),
|
||||
ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
},
|
||||
&format!("list timelines for {tenant_shard_id}"),
|
||||
|| storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
|
||||
&format!("list identifiers in prefix {prefix}"),
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut parsed_ids = HashSet::new();
|
||||
let mut other_prefixes = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in listing.prefixes {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
|
||||
for id_remote_storage_key in listing.prefixes {
|
||||
let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
|
||||
})?;
|
||||
|
||||
match object_name.parse::<TimelineId>() {
|
||||
Ok(t) => timeline_ids.insert(t),
|
||||
match object_name.parse::<T>() {
|
||||
Ok(t) => parsed_ids.insert(t),
|
||||
Err(_) => other_prefixes.insert(object_name.to_string()),
|
||||
};
|
||||
}
|
||||
@@ -301,7 +290,31 @@ pub async fn list_remote_timelines(
|
||||
other_prefixes.insert(object_name.to_string());
|
||||
}
|
||||
|
||||
Ok((timeline_ids, other_prefixes))
|
||||
Ok((parsed_ids, other_prefixes))
|
||||
}
|
||||
|
||||
/// List shards of given tenant in remote storage
|
||||
pub(crate) async fn list_remote_tenant_shards(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
|
||||
let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
|
||||
list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
|
||||
}
|
||||
|
||||
/// List timelines of given tenant shard in remote storage
|
||||
pub async fn list_remote_timelines(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: TenantShardId,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
|
||||
list_identifiers::<TimelineId>(storage, remote_path, cancel).await
|
||||
}
|
||||
|
||||
async fn do_download_index_part(
|
||||
@@ -310,7 +323,7 @@ async fn do_download_index_part(
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
@@ -335,7 +348,7 @@ async fn do_download_index_part(
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok(index_part)
|
||||
Ok((index_part, index_generation))
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
@@ -344,13 +357,13 @@ async fn do_download_index_part(
|
||||
/// In this function we probe for the most recent index in a generation <= our current generation.
|
||||
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
#[tracing::instrument(skip_all, fields(generation=?my_generation))]
|
||||
pub(super) async fn download_index_part(
|
||||
pub(crate) async fn download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if my_generation.is_none() {
|
||||
|
||||
@@ -118,6 +118,9 @@ pub(super) async fn gather_inputs(
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
//
|
||||
// FIXME: if a single timeline is deleted while refresh gc info is ongoing, we will fail the
|
||||
// whole computation. It does not make sense from the billing perspective.
|
||||
tenant
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
|
||||
@@ -17,7 +17,7 @@ use anyhow::{anyhow, ensure, Result};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -78,10 +78,10 @@ impl std::fmt::Debug for InMemoryLayer {
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The value is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
index: HashMap<Key, VecMap<Lsn, u64>>,
|
||||
index: BTreeMap<Key, VecMap<Lsn, u64>>,
|
||||
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
@@ -384,29 +384,24 @@ impl InMemoryLayer {
|
||||
let mut planned_block_reads = BinaryHeap::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
};
|
||||
for (key, vec_map) in inner.index.range(range.start..range.end) {
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
};
|
||||
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
planned_block_reads.push(BlockRead {
|
||||
key,
|
||||
lsn: *entry_lsn,
|
||||
block_offset: *pos,
|
||||
});
|
||||
}
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
planned_block_reads.push(BlockRead {
|
||||
key: *key,
|
||||
lsn: *entry_lsn,
|
||||
block_offset: *pos,
|
||||
});
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
|
||||
let keyspace_size = keyspace.total_size();
|
||||
let keyspace_size = keyspace.total_raw_size();
|
||||
|
||||
let mut completed_keys = HashSet::new();
|
||||
while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
|
||||
@@ -482,7 +477,7 @@ impl InMemoryLayer {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
|
||||
let key = InMemoryLayerFileId(file.id());
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
@@ -499,7 +494,7 @@ impl InMemoryLayer {
|
||||
end_lsn: OnceLock::new(),
|
||||
opened_at: Instant::now(),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: HashMap::new(),
|
||||
index: BTreeMap::new(),
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
@@ -636,26 +631,17 @@ impl InMemoryLayer {
|
||||
|
||||
let cursor = inner.file.block_cursor();
|
||||
|
||||
// Sort the keys because delta layer writer expects them sorted.
|
||||
//
|
||||
// NOTE: this sort can take up significant time if the layer has millions of
|
||||
// keys. To speed up all the comparisons we convert the key to i128 and
|
||||
// keep the value as a reference.
|
||||
let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
|
||||
keys.sort_unstable_by_key(|k| k.0);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
for (key, vec_map) in keys.iter() {
|
||||
let key = Key::from_i128(*key);
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
// Write all page versions
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
let res;
|
||||
(buf, res) = delta_layer_writer
|
||||
.put_value_bytes(key, *lsn, buf, will_init)
|
||||
.put_value_bytes(*key, *lsn, buf, will_init)
|
||||
.await;
|
||||
res?;
|
||||
}
|
||||
|
||||
@@ -17,13 +17,13 @@ use fail::fail_point;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{AUX_FILES_KEY, NON_INHERITED_RANGE},
|
||||
keyspace::KeySpaceAccum,
|
||||
keyspace::{KeySpaceAccum, ShardedRange},
|
||||
models::{
|
||||
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
@@ -936,7 +936,7 @@ impl Timeline {
|
||||
return Err(GetVectoredError::InvalidLsn(lsn));
|
||||
}
|
||||
|
||||
let key_count = keyspace.total_size().try_into().unwrap();
|
||||
let key_count = keyspace.total_raw_size().try_into().unwrap();
|
||||
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
|
||||
return Err(GetVectoredError::Oversized(key_count));
|
||||
}
|
||||
@@ -1076,7 +1076,7 @@ impl Timeline {
|
||||
mut reconstruct_state: ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let get_kind = if keyspace.total_size() == 1 {
|
||||
let get_kind = if keyspace.total_raw_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
GetKind::Vectored
|
||||
@@ -1149,6 +1149,11 @@ impl Timeline {
|
||||
panic!(concat!("Sequential get failed with {}, but vectored get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
seq_err, keyspace, lsn) },
|
||||
(Ok(_), Err(GetVectoredError::GetReadyAncestorError(GetReadyAncestorError::AncestorLsnTimeout(_)))) => {
|
||||
// Sequential get runs after vectored get, so it is possible for the later
|
||||
// to time out while waiting for its ancestor's Lsn to become ready and for the
|
||||
// former to succeed (it essentially has a doubled wait time).
|
||||
},
|
||||
(Ok(_), Err(vec_err)) => {
|
||||
panic!(concat!("Vectored get failed with {}, but sequential get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
@@ -1871,6 +1876,15 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_switch_to_aux_file_v2(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.switch_to_aux_file_v2
|
||||
.unwrap_or(self.conf.default_tenant_conf.switch_to_aux_file_v2)
|
||||
}
|
||||
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -3193,7 +3207,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
|
||||
if keyspace.total_raw_size() == 0 || timeline.ancestor_timeline.is_none() {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -3206,7 +3220,7 @@ impl Timeline {
|
||||
timeline = &*timeline_owned;
|
||||
}
|
||||
|
||||
if keyspace.total_size() != 0 {
|
||||
if keyspace.total_raw_size() != 0 {
|
||||
return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
|
||||
}
|
||||
|
||||
@@ -3897,7 +3911,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
let partitioning = keyspace.partition(&self.shard_identity, partition_size);
|
||||
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
|
||||
@@ -3906,24 +3920,6 @@ impl Timeline {
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let last = self.last_image_layer_creation_check_at.load();
|
||||
if lsn != Lsn(0) {
|
||||
let distance = lsn
|
||||
.checked_sub(last)
|
||||
.expect("Attempt to compact with LSN going backwards");
|
||||
|
||||
let min_distance = self.get_image_layer_creation_check_threshold() as u64
|
||||
* self.get_checkpoint_distance();
|
||||
|
||||
// Skip the expensive delta layer counting below if we've not ingested
|
||||
// sufficient WAL since the last check.
|
||||
if distance.0 < min_distance {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
self.last_image_layer_creation_check_at.store(lsn);
|
||||
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
@@ -3995,9 +3991,37 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers = {
|
||||
let last_checks_at = self.last_image_layer_creation_check_at.load();
|
||||
let distance = lsn
|
||||
.checked_sub(last_checks_at)
|
||||
.expect("Attempt to compact with LSN going backwards");
|
||||
let min_distance = self.get_image_layer_creation_check_threshold() as u64
|
||||
* self.get_checkpoint_distance();
|
||||
|
||||
// Skip the expensive delta layer counting if this timeline has not ingested sufficient
|
||||
// WAL since the last check.
|
||||
distance.0 >= min_distance
|
||||
};
|
||||
|
||||
if check_for_image_layers {
|
||||
self.last_image_layer_creation_check_at.store(lsn);
|
||||
}
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
if !force && !self.time_for_new_image_layer(partition, lsn).await {
|
||||
|
||||
let do_it = if force {
|
||||
true
|
||||
} else if check_for_image_layers {
|
||||
// [`Self::time_for_new_image_layer`] is CPU expensive,
|
||||
// so skip if we've not collected enough WAL since the last time
|
||||
self.time_for_new_image_layer(partition, lsn).await
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if !do_it {
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
@@ -4040,7 +4064,7 @@ impl Timeline {
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| last_key_in_range
|
||||
{
|
||||
let results = self
|
||||
@@ -4326,6 +4350,12 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let _timer = self
|
||||
.metrics
|
||||
.update_gc_info_histo
|
||||
.start_timer()
|
||||
.record_on_drop();
|
||||
|
||||
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
|
||||
//
|
||||
// Some unit tests depend on garbage-collection working even when
|
||||
@@ -4488,6 +4518,28 @@ impl Timeline {
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
// 0. Is this layer a relic from a shard split?
|
||||
// (Do this check first because irrespective of later logic regarding LSNs, this
|
||||
// layer should be dropped.)
|
||||
if self.shard_identity.count >= ShardCount::new(2) {
|
||||
// We are a sharded tenant
|
||||
let layer = guard.get_from_desc(&l);
|
||||
if layer.metadata().shard != self.tenant_shard_id.to_index() {
|
||||
// This is an ancestral layer
|
||||
let sharded_range = ShardedRange::new(l.get_key_range(), &self.shard_identity);
|
||||
if sharded_range.page_count() == 0 {
|
||||
// This ancestral layer only covers keys that belong to other shards
|
||||
info!(
|
||||
"garbate collecting layer {} ({:?}) after shard split",
|
||||
l.filename(),
|
||||
l.get_key_range()
|
||||
);
|
||||
layers_to_remove.push(l);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > horizon_cutoff {
|
||||
debug!(
|
||||
|
||||
@@ -15,7 +15,7 @@ use anyhow::{anyhow, Context};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn, Instrument};
|
||||
use utils::id::TimelineId;
|
||||
@@ -831,6 +831,10 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
|
||||
type RequestContext = crate::context::RequestContext;
|
||||
|
||||
fn get_shard_identity(&self) -> &ShardIdentity {
|
||||
self.timeline.get_shard_identity()
|
||||
}
|
||||
|
||||
async fn get_layers(
|
||||
&mut self,
|
||||
key_range: &Range<Key>,
|
||||
|
||||
@@ -37,7 +37,6 @@ pub(crate) use io_engine::IoEngineKind;
|
||||
pub(crate) use metadata::Metadata;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
|
||||
pub(crate) mod owned_buffers_io {
|
||||
//! Abstractions for IO with owned buffers.
|
||||
//!
|
||||
|
||||
@@ -14,8 +14,17 @@ impl<W> Writer<W> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
self.bytes_amount
|
||||
}
|
||||
|
||||
pub fn as_inner(&self) -> &W {
|
||||
&self.dst
|
||||
}
|
||||
|
||||
/// Returns the wrapped `VirtualFile` object as well as the number
|
||||
/// of bytes that were written to it through this object.
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub fn into_inner(self) -> (u64, W) {
|
||||
(self.bytes_amount, self.dst)
|
||||
}
|
||||
|
||||
@@ -47,6 +47,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_inner(&self) -> &W {
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
pub fn inspect_buffer(&self) -> &B {
|
||||
self.buf()
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
|
||||
self.flush().await?;
|
||||
let Self { buf, writer } = self;
|
||||
@@ -61,6 +71,7 @@ where
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
|
||||
where
|
||||
S: IoBuf + Send,
|
||||
@@ -100,6 +111,28 @@ where
|
||||
Ok((chunk_len, chunk.into_inner()))
|
||||
}
|
||||
|
||||
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
|
||||
///
|
||||
/// It is less performant because we always have to copy the borrowed data into the internal buffer
|
||||
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
|
||||
/// for large writes.
|
||||
pub async fn write_buffered_borrowed(&mut self, mut chunk: &[u8]) -> std::io::Result<usize> {
|
||||
let chunk_len = chunk.len();
|
||||
while !chunk.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = chunk.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
chunk = &chunk[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush().await?;
|
||||
}
|
||||
}
|
||||
Ok(chunk_len)
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> std::io::Result<()> {
|
||||
let buf = self.buf.take().expect("must not use after an error");
|
||||
let buf_len = buf.pending();
|
||||
@@ -266,4 +299,31 @@ mod tests {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
|
||||
writer.write_buffered_borrowed(b"abc").await?;
|
||||
writer.write_buffered_borrowed(b"d").await?;
|
||||
writer.write_buffered_borrowed(b"e").await?;
|
||||
writer.write_buffered_borrowed(b"fg").await?;
|
||||
writer.write_buffered_borrowed(b"hi").await?;
|
||||
writer.write_buffered_borrowed(b"j").await?;
|
||||
writer.write_buffered_borrowed(b"klmno").await?;
|
||||
|
||||
let recorder = writer.flush_and_into_inner().await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
{
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
|
||||
expect
|
||||
}
|
||||
.iter()
|
||||
.map(|v| v[..].to_vec())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,7 +279,7 @@ async fn handle_client(
|
||||
|
||||
// doesn't yet matter as pg-sni-router doesn't report analytics logs
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
|
||||
@@ -260,7 +260,9 @@ impl ConnCfg {
|
||||
aux: MetricsAuxInfo,
|
||||
timeout: Duration,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(allow_self_signed_compute)
|
||||
@@ -270,7 +272,9 @@ impl ConnCfg {
|
||||
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = self.0.connect_raw(stream, tls).await?;
|
||||
drop(pause);
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
let stream = connection.stream.into_inner();
|
||||
|
||||
|
||||
@@ -75,6 +75,7 @@ pub type ComputeReady = DatabaseInfo;
|
||||
|
||||
// TODO: replace with an http-based protocol.
|
||||
struct MgmtHandler;
|
||||
#[async_trait::async_trait]
|
||||
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
@@ -88,8 +89,6 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl postgres_backend::HandlerSync<tokio::net::TcpStream> for MgmtHandler {}
|
||||
|
||||
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
|
||||
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;
|
||||
|
||||
|
||||
@@ -20,7 +20,8 @@ use self::parquet::RequestData;
|
||||
|
||||
pub mod parquet;
|
||||
|
||||
static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
|
||||
|
||||
/// Context data for a single request to connect to a database.
|
||||
///
|
||||
@@ -49,9 +50,12 @@ pub struct RequestMonitoring {
|
||||
// extra
|
||||
// This sender is here to keep the request monitoring channel open while requests are taking place.
|
||||
sender: Option<mpsc::UnboundedSender<RequestData>>,
|
||||
// This sender is only used to log the length of session in case of success.
|
||||
disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
|
||||
pub latency_timer: LatencyTimer,
|
||||
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
|
||||
rejected: Option<bool>,
|
||||
disconnect_timestamp: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -100,7 +104,9 @@ impl RequestMonitoring {
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
|
||||
latency_timer: LatencyTimer::new(protocol),
|
||||
disconnect_timestamp: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,11 +196,7 @@ impl RequestMonitoring {
|
||||
self.success = true;
|
||||
}
|
||||
|
||||
pub fn log(self) {}
|
||||
}
|
||||
|
||||
impl Drop for RequestMonitoring {
|
||||
fn drop(&mut self) {
|
||||
pub fn log_connect(&mut self) {
|
||||
let outcome = if self.success {
|
||||
ConnectOutcome::Success
|
||||
} else {
|
||||
@@ -226,4 +228,23 @@ impl Drop for RequestMonitoring {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
}
|
||||
}
|
||||
|
||||
fn log_disconnect(&mut self) {
|
||||
// If we are here, it's guaranteed that the user successfully connected to the endpoint.
|
||||
// Here we log the length of the session.
|
||||
self.disconnect_timestamp = Some(Utc::now());
|
||||
if let Some(tx) = self.disconnect_sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RequestMonitoring {
|
||||
fn drop(&mut self) {
|
||||
if self.sender.is_some() {
|
||||
self.log_connect();
|
||||
} else {
|
||||
self.log_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,10 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, Span};
|
||||
use utils::backoff;
|
||||
|
||||
use crate::config::{remote_storage_from_toml, OptRemoteStorageConfig};
|
||||
use crate::{
|
||||
config::{remote_storage_from_toml, OptRemoteStorageConfig},
|
||||
context::LOG_CHAN_DISCONNECT,
|
||||
};
|
||||
|
||||
use super::{RequestMonitoring, LOG_CHAN};
|
||||
|
||||
@@ -31,6 +34,9 @@ pub struct ParquetUploadArgs {
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_remote_storage: OptRemoteStorageConfig,
|
||||
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
|
||||
|
||||
/// How many rows to include in a row group
|
||||
#[clap(long, default_value_t = 8192)]
|
||||
parquet_upload_row_group_size: usize,
|
||||
@@ -91,6 +97,8 @@ pub struct RequestData {
|
||||
/// Tracks time from session start (HTTP request/libpq TCP handshake)
|
||||
/// Through to success/failure
|
||||
duration_us: u64,
|
||||
/// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
|
||||
disconnect_timestamp: Option<chrono::NaiveDateTime>,
|
||||
}
|
||||
|
||||
impl From<&RequestMonitoring> for RequestData {
|
||||
@@ -120,6 +128,7 @@ impl From<&RequestMonitoring> for RequestData {
|
||||
.elapsed()
|
||||
.unwrap_or_default()
|
||||
.as_micros() as u64, // 584 millenia... good enough
|
||||
disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,8 +150,9 @@ pub async fn worker(
|
||||
LOG_CHAN.set(tx.downgrade()).unwrap();
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
let cancellation_token2 = cancellation_token.clone();
|
||||
tokio::spawn(async move {
|
||||
cancellation_token.cancelled().await;
|
||||
cancellation_token2.cancelled().await;
|
||||
// dropping this sender will cause the channel to close only once
|
||||
// all the remaining inflight requests have been completed.
|
||||
drop(tx);
|
||||
@@ -167,9 +177,38 @@ pub async fn worker(
|
||||
test_remote_failures: 0,
|
||||
};
|
||||
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
// TODO(anna): consider moving this to a separate function.
|
||||
if let Some(disconnect_events_storage_config) =
|
||||
config.parquet_upload_disconnect_events_remote_storage
|
||||
{
|
||||
let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
|
||||
LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
tokio::spawn(async move {
|
||||
cancellation_token.cancelled().await;
|
||||
// dropping this sender will cause the channel to close only once
|
||||
// all the remaining inflight requests have been completed.
|
||||
drop(tx_disconnect);
|
||||
});
|
||||
let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
|
||||
let rx_disconnect = rx_disconnect.map(RequestData::from);
|
||||
|
||||
let storage_disconnect =
|
||||
GenericRemoteStorage::from_config(&disconnect_events_storage_config)
|
||||
.context("remote storage for disconnect events init")?;
|
||||
let parquet_config_disconnect = parquet_config.clone();
|
||||
tokio::try_join!(
|
||||
worker_inner(storage, rx, parquet_config),
|
||||
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
|
||||
)
|
||||
.map(|_| ())
|
||||
} else {
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ParquetConfig {
|
||||
propeties: WriterPropertiesPtr,
|
||||
rows_per_group: usize,
|
||||
@@ -452,6 +491,7 @@ mod tests {
|
||||
success: rng.gen(),
|
||||
cold_start_info: "no",
|
||||
duration_us: rng.gen_range(0..30_000_000),
|
||||
disconnect_timestamp: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -520,15 +560,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1314385, 3, 6000),
|
||||
(1314378, 3, 6000),
|
||||
(1314438, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(1314525, 3, 6000),
|
||||
(1314367, 3, 6000),
|
||||
(1314159, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(438352, 1, 2000)
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -558,11 +598,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1220633, 5, 10000),
|
||||
(1226783, 5, 10000),
|
||||
(1228577, 5, 10000),
|
||||
(1227939, 5, 10000),
|
||||
(1219217, 5, 10000)
|
||||
(1221738, 5, 10000),
|
||||
(1227888, 5, 10000),
|
||||
(1229682, 5, 10000),
|
||||
(1229044, 5, 10000),
|
||||
(1220322, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -594,11 +634,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1206280, 5, 10000),
|
||||
(1206011, 5, 10000),
|
||||
(1206304, 5, 10000),
|
||||
(1206292, 5, 10000),
|
||||
(1206547, 5, 10000)
|
||||
(1207385, 5, 10000),
|
||||
(1207116, 5, 10000),
|
||||
(1207409, 5, 10000),
|
||||
(1207397, 5, 10000),
|
||||
(1207652, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -623,15 +663,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1314385, 3, 6000),
|
||||
(1314378, 3, 6000),
|
||||
(1314438, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(1314525, 3, 6000),
|
||||
(1314367, 3, 6000),
|
||||
(1314159, 3, 6000),
|
||||
(1314395, 3, 6000),
|
||||
(438352, 1, 2000)
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -668,7 +708,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(658823, 2, 3001), (658537, 2, 3000), (658333, 2, 2999)]
|
||||
[(659240, 2, 3001), (658954, 2, 3000), (658750, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -284,6 +284,8 @@ pub struct ComputeConnectionLatencyGroup {
|
||||
pub enum LatencyExclusions {
|
||||
Client,
|
||||
ClientAndCplane,
|
||||
ClientCplaneCompute,
|
||||
ClientCplaneComputeRetry,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
@@ -352,6 +354,7 @@ pub enum Waiting {
|
||||
Cplane,
|
||||
Client,
|
||||
Compute,
|
||||
RetryTimeout,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -359,6 +362,7 @@ struct Accumulated {
|
||||
cplane: time::Duration,
|
||||
client: time::Duration,
|
||||
compute: time::Duration,
|
||||
retry: time::Duration,
|
||||
}
|
||||
|
||||
pub struct LatencyTimer {
|
||||
@@ -421,6 +425,7 @@ impl Drop for LatencyTimerPause<'_> {
|
||||
Waiting::Cplane => self.timer.accumulated.cplane += dur,
|
||||
Waiting::Client => self.timer.accumulated.client += dur,
|
||||
Waiting::Compute => self.timer.accumulated.compute += dur,
|
||||
Waiting::RetryTimeout => self.timer.accumulated.retry += dur,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -464,6 +469,34 @@ impl Drop for LatencyTimer {
|
||||
},
|
||||
duration.saturating_sub(accumulated_total).as_secs_f64(),
|
||||
);
|
||||
|
||||
// Exclude client cplane, compue communication from the accumulated time.
|
||||
let accumulated_total =
|
||||
self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
|
||||
metric.observe(
|
||||
ComputeConnectionLatencyGroup {
|
||||
protocol: self.protocol,
|
||||
cold_start_info: self.cold_start_info,
|
||||
outcome: self.outcome,
|
||||
excluded: LatencyExclusions::ClientCplaneCompute,
|
||||
},
|
||||
duration.saturating_sub(accumulated_total).as_secs_f64(),
|
||||
);
|
||||
|
||||
// Exclude client cplane, compue, retry communication from the accumulated time.
|
||||
let accumulated_total = self.accumulated.client
|
||||
+ self.accumulated.cplane
|
||||
+ self.accumulated.compute
|
||||
+ self.accumulated.retry;
|
||||
metric.observe(
|
||||
ComputeConnectionLatencyGroup {
|
||||
protocol: self.protocol,
|
||||
cold_start_info: self.cold_start_info,
|
||||
outcome: self.outcome,
|
||||
excluded: LatencyExclusions::ClientCplaneComputeRetry,
|
||||
},
|
||||
duration.saturating_sub(accumulated_total).as_secs_f64(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -132,16 +132,14 @@ pub async fn task_main(
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
ctx.log();
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
|
||||
@@ -133,10 +133,17 @@ where
|
||||
|
||||
error!(error = ?err, "could not connect to compute node");
|
||||
|
||||
let node_info = if !node_info.cached() {
|
||||
let node_info = if !node_info.cached() || !err.should_retry_database_address() {
|
||||
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !err.should_retry(num_retries, connect_to_compute_retry_config) {
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
return Err(err.into());
|
||||
}
|
||||
node_info
|
||||
@@ -194,6 +201,10 @@ where
|
||||
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
|
||||
num_retries += 1;
|
||||
|
||||
let pause = ctx
|
||||
.latency_timer
|
||||
.pause(crate::metrics::Waiting::RetryTimeout);
|
||||
time::sleep(wait_duration).await;
|
||||
drop(pause);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,9 @@ pub trait ShouldRetry {
|
||||
err => err.could_retry(),
|
||||
}
|
||||
}
|
||||
fn should_retry_database_address(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for io::Error {
|
||||
@@ -33,6 +36,21 @@ impl ShouldRetry for tokio_postgres::error::DbError {
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
)
|
||||
}
|
||||
fn should_retry_database_address(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
// Here are errors that happens after the user successfully authenticated to the database.
|
||||
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
|
||||
!matches!(
|
||||
self.code(),
|
||||
&SqlState::TOO_MANY_CONNECTIONS
|
||||
| &SqlState::OUT_OF_MEMORY
|
||||
| &SqlState::SYNTAX_ERROR
|
||||
| &SqlState::T_R_SERIALIZATION_FAILURE
|
||||
| &SqlState::INVALID_CATALOG_NAME
|
||||
| &SqlState::INVALID_SCHEMA_NAME
|
||||
| &SqlState::INVALID_PARAMETER_VALUE
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::Error {
|
||||
@@ -45,6 +63,15 @@ impl ShouldRetry for tokio_postgres::Error {
|
||||
false
|
||||
}
|
||||
}
|
||||
fn should_retry_database_address(&self) -> bool {
|
||||
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
io::Error::should_retry_database_address(io_err)
|
||||
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::should_retry_database_address(db_err)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for compute::ConnectionError {
|
||||
@@ -55,6 +82,13 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
fn should_retry_database_address(&self) -> bool {
|
||||
match self {
|
||||
compute::ConnectionError::Postgres(err) => err.should_retry_database_address(),
|
||||
compute::ConnectionError::CouldNotConnect(err) => err.should_retry_database_address(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
|
||||
|
||||
@@ -54,7 +54,11 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
|
||||
|
||||
let wait_duration = retry_after(*num_retries, config);
|
||||
*num_retries += 1;
|
||||
let pause = ctx
|
||||
.latency_timer
|
||||
.pause(crate::metrics::Waiting::RetryTimeout);
|
||||
tokio::time::sleep(wait_duration).await;
|
||||
drop(pause);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -179,7 +179,9 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(timeout);
|
||||
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
drop(pause);
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
|
||||
@@ -156,17 +156,15 @@ pub async fn serve_websocket(
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
ctx.log();
|
||||
Err(e.into())
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
Ok(())
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log();
|
||||
ctx.log_connect();
|
||||
p.proxy_pass().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ async-stream.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
camino.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod cloud_admin_api;
|
||||
pub mod garbage;
|
||||
pub mod metadata_stream;
|
||||
pub mod scan_metadata;
|
||||
pub mod tenant_snapshot;
|
||||
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
@@ -23,17 +24,18 @@ use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::{Client, Config};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use utils::id::TimelineId;
|
||||
use utils::fs_ext;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
const MAX_RETRIES: usize = 20;
|
||||
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
|
||||
@@ -147,6 +149,23 @@ impl RootTarget {
|
||||
self.tenants_root().with_sub_segment(&tenant_id.to_string())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
|
||||
// Only pageserver remote storage contains tenant-shards
|
||||
assert!(matches!(self, Self::Pageserver(_)));
|
||||
let Self::Pageserver(root) = self else {
|
||||
panic!();
|
||||
};
|
||||
|
||||
S3Target {
|
||||
bucket_name: root.bucket_name.clone(),
|
||||
prefix_in_bucket: format!(
|
||||
"{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
|
||||
root.prefix_in_bucket
|
||||
),
|
||||
delimiter: root.delimiter.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
|
||||
match self {
|
||||
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
|
||||
@@ -240,7 +259,6 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
|
||||
.with_ansi(false)
|
||||
.with_writer(file_writer);
|
||||
let stderr_logs = fmt::Layer::new()
|
||||
.with_ansi(std::io::stderr().is_terminal())
|
||||
.with_target(false)
|
||||
.with_writer(std::io::stderr);
|
||||
tracing_subscriber::registry()
|
||||
@@ -396,3 +414,50 @@ async fn download_object_with_retries(
|
||||
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
async fn download_object_to_file(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
version_id: Option<&str>,
|
||||
local_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
|
||||
for _ in 0..MAX_RETRIES {
|
||||
tokio::fs::remove_file(&tmp_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&tmp_path)
|
||||
.await
|
||||
.context("Opening output file")?;
|
||||
|
||||
let request = s3_client.get_object().bucket(bucket_name).key(key);
|
||||
|
||||
let request = match version_id {
|
||||
Some(version_id) => request.version_id(version_id),
|
||||
None => request,
|
||||
};
|
||||
|
||||
let response_stream = match request.send().await {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to download object for key {key} version {}: {e:#}",
|
||||
version_id.unwrap_or("")
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut read_stream = response_stream.body.into_async_read();
|
||||
|
||||
tokio::io::copy(&mut read_stream, &mut file).await?;
|
||||
|
||||
tokio::fs::rename(&tmp_path, local_path).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use s3_scrubber::scan_metadata::scan_metadata;
|
||||
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
|
||||
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@@ -38,6 +41,14 @@ enum Command {
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
},
|
||||
TenantSnapshot {
|
||||
#[arg(long = "tenant-id")]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long = "concurrency", short = 'j', default_value_t = 8)]
|
||||
concurrency: usize,
|
||||
#[arg(short, long)]
|
||||
output_path: Utf8PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -50,6 +61,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::ScanMetadata { .. } => "scan",
|
||||
Command::FindGarbage { .. } => "find-garbage",
|
||||
Command::PurgeGarbage { .. } => "purge-garbage",
|
||||
Command::TenantSnapshot { .. } => "tenant-snapshot",
|
||||
};
|
||||
let _guard = init_logging(&format!(
|
||||
"{}_{}_{}_{}.log",
|
||||
@@ -102,5 +114,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::PurgeGarbage { input_path, mode } => {
|
||||
purge_garbage(input_path, mode, !cli.delete).await
|
||||
}
|
||||
Command::TenantSnapshot {
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
} => {
|
||||
let downloader =
|
||||
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
|
||||
downloader.download().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use tokio_stream::Stream;
|
||||
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TimelineId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
@@ -45,6 +45,62 @@ pub fn stream_tenants<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_tenant_shards<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
|
||||
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let shards_target = target.tenant_shards_prefix(&tenant_id);
|
||||
|
||||
loop {
|
||||
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
|
||||
let fetch_response = match fetch_response {
|
||||
Err(e) => {
|
||||
tenant_shard_ids.push(Err(e));
|
||||
break;
|
||||
}
|
||||
Ok(r) => r,
|
||||
};
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.common_prefixes()
|
||||
.iter()
|
||||
.filter_map(|prefix| prefix.prefix())
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
let first_part = entry_id_str.split('/').next().unwrap();
|
||||
|
||||
first_part
|
||||
.parse::<TenantShardId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {first_part}"))
|
||||
});
|
||||
|
||||
for i in new_entry_ids {
|
||||
tenant_shard_ids.push(i);
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(stream! {
|
||||
for i in tenant_shard_ids {
|
||||
let id = i?;
|
||||
yield Ok(id);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
|
||||
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
|
||||
293
s3_scrubber/src/tenant_snapshot.rs
Normal file
293
s3_scrubber/src/tenant_snapshot.rs
Normal file
@@ -0,0 +1,293 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
|
||||
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
|
||||
use crate::{
|
||||
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use async_stream::stream;
|
||||
use aws_sdk_s3::Client;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
|
||||
pub struct SnapshotDownloader {
|
||||
s3_client: Arc<Client>,
|
||||
s3_root: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
}
|
||||
|
||||
impl SnapshotDownloader {
|
||||
pub fn new(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
bucket_config,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_layer(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layer_name: LayerFileName,
|
||||
layer_metadata: IndexLayerMetadata,
|
||||
) -> anyhow::Result<(LayerFileName, IndexLayerMetadata)> {
|
||||
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
|
||||
// different layer names (remote-style has the generation suffix)
|
||||
let local_path = self.output_path.join(format!(
|
||||
"{}/timelines/{}/{}{}",
|
||||
ttid.tenant_shard_id,
|
||||
ttid.timeline_id,
|
||||
layer_name.file_name(),
|
||||
layer_metadata.generation.get_suffix()
|
||||
));
|
||||
|
||||
// We should only be called for layers that are owned by the input TTID
|
||||
assert_eq!(layer_metadata.shard, ttid.tenant_shard_id.to_index());
|
||||
|
||||
// Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
|
||||
// already exists on local disk, we assume it is fully correct and skip it.
|
||||
if tokio::fs::try_exists(&local_path).await? {
|
||||
tracing::debug!("{} already exists", local_path);
|
||||
return Ok((layer_name, layer_metadata));
|
||||
} else {
|
||||
tracing::debug!("{} requires download...", local_path);
|
||||
|
||||
let timeline_root = self.s3_root.timeline_root(&ttid);
|
||||
let remote_layer_path = format!(
|
||||
"{}{}{}",
|
||||
timeline_root.prefix_in_bucket,
|
||||
layer_name.file_name(),
|
||||
layer_metadata.generation.get_suffix()
|
||||
);
|
||||
|
||||
// List versions: the object might be deleted.
|
||||
let versions = self
|
||||
.s3_client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_config.bucket.clone())
|
||||
.prefix(&remote_layer_path)
|
||||
.send()
|
||||
.await?;
|
||||
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
|
||||
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
|
||||
};
|
||||
download_object_to_file(
|
||||
&self.s3_client,
|
||||
&self.bucket_config.bucket,
|
||||
&remote_layer_path,
|
||||
version.version_id.as_deref(),
|
||||
&local_path,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::debug!("Downloaded successfully to {local_path}");
|
||||
}
|
||||
|
||||
Ok((layer_name, layer_metadata))
|
||||
}
|
||||
|
||||
/// Download many layers belonging to the same TTID, with some concurrency
|
||||
async fn download_layers(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: Vec<(LayerFileName, IndexLayerMetadata)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer_count = layers.len();
|
||||
tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
|
||||
let layers_stream = stream! {
|
||||
for (layer_name, layer_metadata) in layers {
|
||||
yield self.download_layer(ttid, layer_name, layer_metadata);
|
||||
}
|
||||
};
|
||||
|
||||
tokio::fs::create_dir_all(self.output_path.join(format!(
|
||||
"{}/timelines/{}",
|
||||
ttid.tenant_shard_id, ttid.timeline_id
|
||||
)))
|
||||
.await?;
|
||||
|
||||
let layer_results = layers_stream.buffered(self.concurrency);
|
||||
let mut layer_results = std::pin::pin!(layer_results);
|
||||
|
||||
let mut err = None;
|
||||
let mut download_count = 0;
|
||||
while let Some(i) = layer_results.next().await {
|
||||
download_count += 1;
|
||||
match i {
|
||||
Ok((layer_name, layer_metadata)) => {
|
||||
tracing::info!(
|
||||
"[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
|
||||
layer_metadata.file_size,
|
||||
layer_name.file_name()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
// Warn and continue: we will download what we can
|
||||
tracing::warn!("Download error: {e}");
|
||||
err = Some(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(e) = err {
|
||||
tracing::warn!("Some errors occurred downloading {ttid} layers, last error: {e}");
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
index_part: IndexPart,
|
||||
index_part_generation: Generation,
|
||||
ancestor_layers: &mut HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerFileName, IndexLayerMetadata>,
|
||||
>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_bytes = serde_json::to_string(&index_part).unwrap();
|
||||
|
||||
let layers = index_part
|
||||
.layer_metadata
|
||||
.into_iter()
|
||||
.filter_map(|(layer_name, layer_metadata)| {
|
||||
if layer_metadata.shard.shard_count != ttid.tenant_shard_id.shard_count {
|
||||
// Accumulate ancestor layers for later download
|
||||
let ancestor_ttid = TenantShardTimelineId::new(
|
||||
TenantShardId {
|
||||
tenant_id: ttid.tenant_shard_id.tenant_id,
|
||||
shard_number: layer_metadata.shard.shard_number,
|
||||
shard_count: layer_metadata.shard.shard_count,
|
||||
},
|
||||
ttid.timeline_id,
|
||||
);
|
||||
let ancestor_ttid_layers = ancestor_layers.entry(ancestor_ttid).or_default();
|
||||
use std::collections::hash_map::Entry;
|
||||
match ancestor_ttid_layers.entry(layer_name) {
|
||||
Entry::Occupied(entry) => {
|
||||
// Descendent shards that reference a layer from an ancestor should always have matching metadata,
|
||||
// as their siblings, because it is read atomically during a shard split.
|
||||
assert_eq!(entry.get(), &layer_metadata);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(layer_metadata);
|
||||
}
|
||||
}
|
||||
None
|
||||
} else {
|
||||
Some((layer_name, layer_metadata))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let download_result = self.download_layers(ttid, layers).await;
|
||||
|
||||
// Write index last, once all the layers it references are downloaded
|
||||
let local_index_path = self.output_path.join(format!(
|
||||
"{}/timelines/{}/index_part.json{}",
|
||||
ttid.tenant_shard_id,
|
||||
ttid.timeline_id,
|
||||
index_part_generation.get_suffix()
|
||||
));
|
||||
tokio::fs::write(&local_index_path, index_bytes)
|
||||
.await
|
||||
.context("writing index")?;
|
||||
|
||||
download_result
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
|
||||
let shards: Vec<TenantShardId> = shards.try_collect().await?;
|
||||
|
||||
// Only read from shards that have the highest count: avoids redundantly downloading
|
||||
// from ancestor shards.
|
||||
let Some(shard_count) = shards.iter().map(|s| s.shard_count).max() else {
|
||||
anyhow::bail!("No shards found");
|
||||
};
|
||||
|
||||
// We will build a collection of layers in anccestor shards to download (this will only
|
||||
// happen if this tenant has been split at some point)
|
||||
let mut ancestor_layers: HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerFileName, IndexLayerMetadata>,
|
||||
> = Default::default();
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn load_timeline_index(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
|
||||
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
match data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
} => {
|
||||
self.download_timeline(
|
||||
ttid,
|
||||
index_part,
|
||||
index_part_generation,
|
||||
&mut ancestor_layers,
|
||||
)
|
||||
.await
|
||||
.context("Downloading timeline")?;
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect(_) => {
|
||||
tracing::error!("Bad metadata in timeline {ttid}");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
for (ttid, layers) in ancestor_layers.into_iter() {
|
||||
tracing::info!(
|
||||
"Downloading {} layers from ancvestor timeline {ttid}...",
|
||||
layers.len()
|
||||
);
|
||||
|
||||
self.download_layers(ttid, layers.into_iter().collect())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -2,13 +2,10 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use std::net::TcpStream;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io_timeout::TimeoutReader;
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -98,7 +95,8 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO>
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
{
|
||||
// tenant_id and timeline_id are passed in connection string params
|
||||
@@ -193,22 +191,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO
|
||||
self.claims = Some(data.claims);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type IO<'s, R: FnMut(usize), W> =
|
||||
MeasuredStream<std::pin::Pin<&'s mut TimeoutReader<TcpStream>>, R, W>;
|
||||
|
||||
impl<'s, R: FnMut(usize), W> postgres_backend::Handler<IO<'s, R, W>> for SafekeeperPostgresHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO<'s, R, W>>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
self.process_query_(pgb, &query_string).await
|
||||
}
|
||||
}
|
||||
impl SafekeeperPostgresHandler {
|
||||
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
|
||||
@@ -3,11 +3,13 @@ use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::{
|
||||
backoff::{self},
|
||||
id::{NodeId, TenantId},
|
||||
@@ -420,48 +422,37 @@ impl ComputeHook {
|
||||
.and_then(|x| x)
|
||||
}
|
||||
|
||||
/// Call this to notify the compute (postgres) tier of new pageservers to use
|
||||
/// for a tenant. notify() is called by each shard individually, and this function
|
||||
/// will decide whether an update to the tenant is sent. An update is sent on the
|
||||
/// condition that:
|
||||
/// - We know a pageserver for every shard.
|
||||
/// - All the shards have the same shard_count (i.e. we are not mid-split)
|
||||
///
|
||||
/// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
|
||||
/// that is cancelled.
|
||||
///
|
||||
/// This function is fallible, including in the case that the control plane is transiently
|
||||
/// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
/// Synchronous phase: update the per-tenant state for the next intended notification
|
||||
fn notify_prepare(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> MaybeSendResult {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
tenant.maybe_send(tenant_shard_id.tenant_id, None)
|
||||
}
|
||||
|
||||
async fn notify_execute(
|
||||
&self,
|
||||
maybe_send_result: MaybeSendResult,
|
||||
tenant_shard_id: TenantShardId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let maybe_send_result = {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
tenant.maybe_send(tenant_shard_id.tenant_id, None)
|
||||
};
|
||||
|
||||
// Process result: we may get an update to send, or we may have to wait for a lock
|
||||
// before trying again.
|
||||
let (request, mut send_lock_guard) = match maybe_send_result {
|
||||
@@ -469,7 +460,12 @@ impl ComputeHook {
|
||||
return Ok(());
|
||||
}
|
||||
MaybeSendResult::AwaitLock(send_lock) => {
|
||||
let send_locked = send_lock.lock_owned().await;
|
||||
let send_locked = tokio::select! {
|
||||
guard = send_lock.lock_owned() => {guard},
|
||||
_ = cancel.cancelled() => {
|
||||
return Err(NotifyError::ShuttingDown)
|
||||
}
|
||||
};
|
||||
|
||||
// Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
|
||||
// we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
|
||||
@@ -508,6 +504,94 @@ impl ComputeHook {
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Infallible synchronous fire-and-forget version of notify(), that sends its results to
|
||||
/// a channel. Something should consume the channel and arrange to try notifying again
|
||||
/// if something failed.
|
||||
pub(super) fn notify_background(
|
||||
self: &Arc<Self>,
|
||||
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
|
||||
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
let mut maybe_sends = Vec::new();
|
||||
for (tenant_shard_id, node_id, stripe_size) in notifications {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
maybe_sends.push((tenant_shard_id, maybe_send_result))
|
||||
}
|
||||
|
||||
let this = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
// Construct an async stream of futures to invoke the compute notify function: we do this
|
||||
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
|
||||
// ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
|
||||
// would mostly just be waiting on that semaphore.
|
||||
let mut stream = futures::stream::iter(maybe_sends)
|
||||
.map(|(tenant_shard_id, maybe_send_result)| {
|
||||
let this = this.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
this
|
||||
.notify_execute(maybe_send_result, tenant_shard_id, &cancel)
|
||||
.await.map_err(|e| (tenant_shard_id, e))
|
||||
}.instrument(info_span!(
|
||||
"notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
|
||||
))
|
||||
})
|
||||
.buffered(API_CONCURRENCY);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
next = stream.next() => {
|
||||
match next {
|
||||
Some(r) => {
|
||||
result_tx.send(r).await.ok();
|
||||
},
|
||||
None => {
|
||||
tracing::info!("Finished sending background compute notifications");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutdown while running background compute notifications");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Call this to notify the compute (postgres) tier of new pageservers to use
|
||||
/// for a tenant. notify() is called by each shard individually, and this function
|
||||
/// will decide whether an update to the tenant is sent. An update is sent on the
|
||||
/// condition that:
|
||||
/// - We know a pageserver for every shard.
|
||||
/// - All the shards have the same shard_count (i.e. we are not mid-split)
|
||||
///
|
||||
/// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
|
||||
/// that is cancelled.
|
||||
///
|
||||
/// This function is fallible, including in the case that the control plane is transiently
|
||||
/// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -522,6 +522,18 @@ async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state.service.tenant_import(tenant_id).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -759,6 +771,13 @@ pub fn make_router(
|
||||
.post("/debug/v1/node/:node_id/drop", |r| {
|
||||
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
|
||||
})
|
||||
.post("/debug/v1/tenant/:tenant_id/import", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_tenant_import,
|
||||
RequestName("debug_v1_tenant_import"),
|
||||
)
|
||||
})
|
||||
.get("/debug/v1/tenant", |r| {
|
||||
named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
|
||||
})
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
@@ -88,6 +89,18 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantScanRemoteStorageResponse> {
|
||||
measured_request!(
|
||||
"tenant_scan_remote_storage",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner.tenant_scan_remote_storage(tenant_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_secondary_download(
|
||||
&self,
|
||||
tenant_id: TenantShardId,
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::IdLockMap,
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
@@ -61,7 +62,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
compute_hook::{self, ComputeHook},
|
||||
compute_hook::ComputeHook,
|
||||
heartbeater::{Heartbeater, PageserverState},
|
||||
node::{AvailabilityTransition, Node},
|
||||
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
|
||||
@@ -110,6 +111,42 @@ struct ServiceState {
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
}
|
||||
|
||||
/// Transform an error from a pageserver into an error to return to callers of a storage
|
||||
/// controller API.
|
||||
fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
match e {
|
||||
mgmt_api::Error::ReceiveErrorBody(str) => {
|
||||
// Presume errors receiving body are connectivity/availability issues
|
||||
ApiError::ResourceUnavailable(
|
||||
format!("{node} error receiving error body: {str}").into(),
|
||||
)
|
||||
}
|
||||
mgmt_api::Error::ReceiveBody(str) => {
|
||||
// Presume errors receiving body are connectivity/availability issues
|
||||
ApiError::ResourceUnavailable(format!("{node} error receiving body: {str}").into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, msg) => {
|
||||
ApiError::NotFound(anyhow::anyhow!(format!("{node}: {msg}")).into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg) => {
|
||||
ApiError::ResourceUnavailable(format!("{node}: {msg}").into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(status @ StatusCode::UNAUTHORIZED, msg)
|
||||
| mgmt_api::Error::ApiError(status @ StatusCode::FORBIDDEN, msg) => {
|
||||
// Auth errors talking to a pageserver are not auth errors for the caller: they are
|
||||
// internal server errors, showing that something is wrong with the pageserver or
|
||||
// storage controller's auth configuration.
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}"))
|
||||
}
|
||||
mgmt_api::Error::ApiError(status, msg) => {
|
||||
// Presume general case of pageserver API errors is that we tried to do something
|
||||
// that can't be done right now.
|
||||
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
|
||||
}
|
||||
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
|
||||
impl ServiceState {
|
||||
fn new(
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
@@ -296,7 +333,12 @@ impl Service {
|
||||
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
|
||||
/// view of the world, and determine which pageservers are responsive.
|
||||
#[instrument(skip_all)]
|
||||
async fn startup_reconcile(self: &Arc<Service>) {
|
||||
async fn startup_reconcile(
|
||||
self: &Arc<Service>,
|
||||
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
|
||||
Result<(), (TenantShardId, NotifyError)>,
|
||||
>,
|
||||
) {
|
||||
// For all tenant shards, a vector of observed states on nodes (where None means
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
|
||||
@@ -315,10 +357,6 @@ impl Service {
|
||||
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
|
||||
.expect("Reconcile timeout is a modest constant");
|
||||
|
||||
let compute_notify_deadline = start_at
|
||||
.checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
|
||||
.expect("Reconcile timeout is a modest constant");
|
||||
|
||||
// Accumulate a list of any tenant locations that ought to be detached
|
||||
let mut cleanup = Vec::new();
|
||||
|
||||
@@ -344,6 +382,7 @@ impl Service {
|
||||
let mut compute_notifications = Vec::new();
|
||||
|
||||
// Populate intent and observed states for all tenants, based on reported state on pageservers
|
||||
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
|
||||
let shard_count = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
@@ -410,28 +449,27 @@ impl Service {
|
||||
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
|
||||
// will emit compute hook notifications when they reconcile.
|
||||
//
|
||||
// Ordering: we must complete these notification attempts before doing any other reconciliation for the
|
||||
// tenants named here, because otherwise our calls to notify() might race with more recent values
|
||||
// generated by reconciliation.
|
||||
let notify_failures = self
|
||||
.compute_notify_many(compute_notifications, compute_notify_deadline)
|
||||
.await;
|
||||
|
||||
// Compute notify is fallible. If it fails here, do not delay overall startup: set the
|
||||
// flag on these shards that they have a pending notification.
|
||||
// Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for tenant_shard_id in notify_failures.into_iter() {
|
||||
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
|
||||
shard.pending_compute_notification = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Ordering: our calls to notify_background synchronously establish a relative order for these notifications vs. any later
|
||||
// calls into the ComputeHook for the same tenant: we can leave these to run to completion in the background and any later
|
||||
// calls will be correctly ordered wrt these.
|
||||
//
|
||||
// Concurrency: we call notify_background for all tenants, which will create O(N) tokio tasks, but almost all of them
|
||||
// will just wait on the ComputeHook::API_CONCURRENCY semaphore immediately, so very cheap until they get that semaphore
|
||||
// unit and start doing I/O.
|
||||
tracing::info!(
|
||||
"Sending {} compute notifications",
|
||||
compute_notifications.len()
|
||||
);
|
||||
self.compute_hook.notify_background(
|
||||
compute_notifications,
|
||||
bg_compute_notify_result_tx.clone(),
|
||||
&self.cancel,
|
||||
);
|
||||
|
||||
// Finally, now that the service is up and running, launch reconcile operations for any tenants
|
||||
// which require it: under normal circumstances this should only include tenants that were in some
|
||||
// transient state before we restarted, or any tenants whose compute hooks failed above.
|
||||
tracing::info!("Checking for shards in need of reconciliation...");
|
||||
let reconcile_tasks = self.reconcile_all();
|
||||
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
|
||||
// normal operations may proceed.
|
||||
@@ -472,6 +510,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Sending initial heartbeats...");
|
||||
let res = self
|
||||
.heartbeater
|
||||
.heartbeat(Arc::new(nodes_to_heartbeat))
|
||||
@@ -508,6 +547,7 @@ impl Service {
|
||||
|
||||
let mut node_list_futs = FuturesUnordered::new();
|
||||
|
||||
tracing::info!("Scanning shards on {} nodes...", nodes.len());
|
||||
for node in nodes.values() {
|
||||
node_list_futs.push({
|
||||
async move {
|
||||
@@ -627,72 +667,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
|
||||
///
|
||||
/// Returns a set of any shards for which notifications where not acked within the deadline.
|
||||
async fn compute_notify_many(
|
||||
&self,
|
||||
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
|
||||
deadline: Instant,
|
||||
) -> HashSet<TenantShardId> {
|
||||
let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
|
||||
let mut success_shards = HashSet::new();
|
||||
|
||||
// Construct an async stream of futures to invoke the compute notify function: we do this
|
||||
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
|
||||
let mut stream = futures::stream::iter(notifications.into_iter())
|
||||
.map(|(tenant_shard_id, node_id, stripe_size)| {
|
||||
let compute_hook = self.compute_hook.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
async move {
|
||||
if let Err(e) = compute_hook
|
||||
.notify(tenant_shard_id, node_id, stripe_size, &cancel)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
%tenant_shard_id,
|
||||
%node_id,
|
||||
"Failed to notify compute on startup for shard: {e}"
|
||||
);
|
||||
None
|
||||
} else {
|
||||
Some(tenant_shard_id)
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffered(compute_hook::API_CONCURRENCY);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
next = stream.next() => {
|
||||
match next {
|
||||
Some(Some(success_shard)) => {
|
||||
// A notification succeeded
|
||||
success_shards.insert(success_shard);
|
||||
},
|
||||
Some(None) => {
|
||||
// A notification that failed
|
||||
},
|
||||
None => {
|
||||
tracing::info!("Successfully sent all compute notifications");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
|
||||
// Give up sending any that didn't succeed yet
|
||||
tracing::info!("Reached deadline while sending compute notifications");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
attempt_shards
|
||||
.difference(&success_shards)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Long running background task that periodically wakes up and looks for shards that need
|
||||
/// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during
|
||||
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
|
||||
@@ -851,23 +825,45 @@ impl Service {
|
||||
async fn process_results(
|
||||
&self,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
|
||||
mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver<
|
||||
Result<(), (TenantShardId, NotifyError)>,
|
||||
>,
|
||||
) {
|
||||
loop {
|
||||
// Wait for the next result, or for cancellation
|
||||
let result = tokio::select! {
|
||||
tokio::select! {
|
||||
r = result_rx.recv() => {
|
||||
match r {
|
||||
Some(result) => {result},
|
||||
Some(result) => {self.process_result(result);},
|
||||
None => {break;}
|
||||
}
|
||||
}
|
||||
_ = async{
|
||||
match bg_compute_hook_result_rx.recv().await {
|
||||
Some(result) => {
|
||||
if let Err((tenant_shard_id, notify_error)) = result {
|
||||
tracing::warn!("Marking shard {tenant_shard_id} for notification retry, due to error {notify_error}");
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
|
||||
shard.pending_compute_notification = true;
|
||||
}
|
||||
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// This channel is dead, but we don't want to terminate the outer loop{}: just wait for shutdown
|
||||
self.cancel.cancelled().await;
|
||||
}
|
||||
}
|
||||
} => {},
|
||||
_ = self.cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
self.process_result(result);
|
||||
}
|
||||
|
||||
// We should only fall through on shutdown
|
||||
assert!(self.cancel.is_cancelled());
|
||||
}
|
||||
|
||||
async fn process_aborts(
|
||||
@@ -1028,6 +1024,10 @@ impl Service {
|
||||
|
||||
let (startup_completion, startup_complete) = utils::completion::channel();
|
||||
|
||||
// This channel is continuously consumed by process_results, so doesn't need to be very large.
|
||||
let (bg_compute_notify_result_tx, bg_compute_notify_result_rx) =
|
||||
tokio::sync::mpsc::channel(512);
|
||||
|
||||
let (delayed_reconcile_tx, delayed_reconcile_rx) =
|
||||
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
|
||||
|
||||
@@ -1065,7 +1065,9 @@ impl Service {
|
||||
tokio::task::spawn(async move {
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
if let Ok(_gate) = result_task_this.gate.enter() {
|
||||
result_task_this.process_results(result_rx).await
|
||||
result_task_this
|
||||
.process_results(result_rx, bg_compute_notify_result_rx)
|
||||
.await
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1107,7 +1109,7 @@ impl Service {
|
||||
return;
|
||||
};
|
||||
|
||||
this.startup_reconcile().await;
|
||||
this.startup_reconcile(bg_compute_notify_result_tx).await;
|
||||
drop(startup_completion);
|
||||
}
|
||||
});
|
||||
@@ -2519,17 +2521,7 @@ impl Service {
|
||||
client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
mgmt_api::Error::ApiError(status, msg)
|
||||
if status == StatusCode::INTERNAL_SERVER_ERROR
|
||||
|| status == StatusCode::NOT_ACCEPTABLE =>
|
||||
{
|
||||
// TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
|
||||
// for pass-through API calls.
|
||||
ApiError::InternalServerError(anyhow::anyhow!(msg))
|
||||
}
|
||||
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
|
||||
})
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
@@ -3654,6 +3646,88 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is for debug/support only: assuming tenant data is already present in S3, we "create" a
|
||||
/// tenant with a very high generation number so that it will see the existing data.
|
||||
pub(crate) async fn tenant_import(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantCreateResponse, ApiError> {
|
||||
// Pick an arbitrary available pageserver to use for scanning the tenant in remote storage
|
||||
let maybe_node = {
|
||||
self.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.values()
|
||||
.find(|n| n.is_available())
|
||||
.cloned()
|
||||
};
|
||||
let Some(node) = maybe_node else {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("No nodes available")));
|
||||
};
|
||||
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
.tenant_scan_remote_storage(tenant_id)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
// A post-split tenant may contain a mixture of shard counts in remote storage: pick the highest count.
|
||||
let Some(shard_count) = scan_result
|
||||
.shards
|
||||
.iter()
|
||||
.map(|s| s.tenant_shard_id.shard_count)
|
||||
.max()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("No shards found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
// Ideally we would set each newly imported shard's generation independently, but for correctness it is sufficient
|
||||
// to
|
||||
let generation = scan_result
|
||||
.shards
|
||||
.iter()
|
||||
.map(|s| s.generation)
|
||||
.max()
|
||||
.expect("We already validated >0 shards");
|
||||
|
||||
// FIXME: we have no way to recover the shard stripe size from contents of remote storage: this will
|
||||
// only work if they were using the default stripe size.
|
||||
let stripe_size = ShardParameters::DEFAULT_STRIPE_SIZE;
|
||||
|
||||
let (response, waiters) = self
|
||||
.do_tenant_create(TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation,
|
||||
|
||||
shard_parameters: ShardParameters {
|
||||
count: shard_count,
|
||||
stripe_size,
|
||||
},
|
||||
placement_policy: Some(PlacementPolicy::Attached(0)), // No secondaries, for convenient debug/hacking
|
||||
|
||||
// There is no way to know what the tenant's config was: revert to defaults
|
||||
config: TenantConfig::default(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
|
||||
// Since this is a debug/support operation, all kinds of weird issues are possible (e.g. this
|
||||
// tenant doesn't exist in the control plane), so don't fail the request if it can't fully
|
||||
// reconcile, as reconciliation includes notifying compute.
|
||||
tracing::warn!(%tenant_id, "Reconcile not done yet while importing tenant ({e})");
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// For debug/support: a full JSON dump of TenantShards. Returns a response so that
|
||||
/// we don't have to make TenantShard clonable in the return path.
|
||||
pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
|
||||
|
||||
@@ -512,6 +512,11 @@ class NeonEnvBuilder:
|
||||
self.pageserver_get_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_impl config to "vectored"')
|
||||
|
||||
self.pageserver_validate_vectored_get: Optional[bool] = None
|
||||
if (validate := os.getenv("PAGESERVER_VALIDATE_VEC_GET")) is not None:
|
||||
self.pageserver_validate_vectored_get = bool(validate)
|
||||
log.debug(f'Overriding pageserver validate_vectored_get config to "{validate}"')
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -1085,6 +1090,8 @@ class NeonEnv:
|
||||
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
|
||||
if config.pageserver_get_impl is not None:
|
||||
ps_cfg["get_impl"] = config.pageserver_get_impl
|
||||
if config.pageserver_validate_vectored_get is not None:
|
||||
ps_cfg["validate_vectored_get"] = config.pageserver_validate_vectored_get
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
@@ -1575,6 +1582,11 @@ class NeonCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return tenant_id, timeline_id
|
||||
|
||||
def import_tenant(self, tenant_id: TenantId):
|
||||
args = ["tenant", "import", "--tenant-id", str(tenant_id)]
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
|
||||
def set_default(self, tenant_id: TenantId):
|
||||
"""
|
||||
Update default tenant for future operations that require tenant_id.
|
||||
@@ -2207,6 +2219,13 @@ class NeonStorageController(MetricsGetter):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def tenant_import(self, tenant_id: TenantId):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def reconcile_all(self):
|
||||
r = self.request(
|
||||
"POST",
|
||||
@@ -2298,20 +2317,24 @@ class NeonPageserver(PgProtocol):
|
||||
# The entries in the list are regular experessions.
|
||||
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
|
||||
def timeline_dir(
|
||||
self,
|
||||
tenant_shard_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
if timeline_id is None:
|
||||
return self.tenant_dir(tenant_id) / "timelines"
|
||||
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
|
||||
return self.tenant_dir(tenant_shard_id) / "timelines"
|
||||
return self.tenant_dir(tenant_shard_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def tenant_dir(
|
||||
self,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
tenant_shard_id: Optional[Union[TenantId, TenantShardId]] = None,
|
||||
) -> Path:
|
||||
"""Get a tenant directory's path based on the repo directory of the test environment"""
|
||||
if tenant_id is None:
|
||||
if tenant_shard_id is None:
|
||||
return self.workdir / "tenants"
|
||||
return self.workdir / "tenants" / str(tenant_id)
|
||||
return self.workdir / "tenants" / str(tenant_shard_id)
|
||||
|
||||
def start(
|
||||
self,
|
||||
@@ -2498,8 +2521,10 @@ class NeonPageserver(PgProtocol):
|
||||
client = self.http_client()
|
||||
return client.tenant_location_conf(tenant_id, config, **kwargs)
|
||||
|
||||
def read_tenant_location_conf(self, tenant_id: TenantId) -> dict[str, Any]:
|
||||
path = self.tenant_dir(tenant_id) / "config-v1"
|
||||
def read_tenant_location_conf(
|
||||
self, tenant_shard_id: Union[TenantId, TenantShardId]
|
||||
) -> dict[str, Any]:
|
||||
path = self.tenant_dir(tenant_shard_id) / "config-v1"
|
||||
log.info(f"Reading location conf from {path}")
|
||||
bytes = open(path, "r").read()
|
||||
try:
|
||||
@@ -3703,7 +3728,7 @@ class S3Scrubber:
|
||||
log.warning(f"Scrub environment: {env}")
|
||||
log.warning(f"Output at: {output_path}")
|
||||
|
||||
raise RuntimeError("Remote storage scrub failed")
|
||||
raise RuntimeError(f"Scrubber failed while running {args}")
|
||||
|
||||
assert stdout is not None
|
||||
return stdout
|
||||
@@ -3718,6 +3743,13 @@ class S3Scrubber:
|
||||
log.error(stdout)
|
||||
raise
|
||||
|
||||
def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
|
||||
stdout = self.scrubber_cli(
|
||||
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],
|
||||
timeout=30,
|
||||
)
|
||||
log.info(f"tenant-snapshot output: {stdout}")
|
||||
|
||||
|
||||
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
|
||||
"""Compute the path to a working directory for an individual test."""
|
||||
|
||||
@@ -252,8 +252,11 @@ class S3Storage:
|
||||
|
||||
log.info(f"deleted {cnt} objects from remote storage")
|
||||
|
||||
def tenants_path(self) -> str:
|
||||
return f"{self.prefix_in_bucket}/tenants"
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.prefix_in_bucket}/tenants/{tenant_id}"
|
||||
return f"{self.tenants_path()}/{tenant_id}"
|
||||
|
||||
def heatmap_key(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
|
||||
@@ -262,6 +265,9 @@ class S3Storage:
|
||||
r = self.client.get_object(Bucket=self.bucket_name, Key=self.heatmap_key(tenant_id))
|
||||
return json.loads(r["Body"].read().decode("utf-8"))
|
||||
|
||||
def mock_remote_tenant_path(self, tenant_id: TenantId):
|
||||
assert self.real is False
|
||||
|
||||
|
||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||
|
||||
|
||||
@@ -156,7 +156,11 @@ class TenantShardId:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
if self.shard_count > 0:
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
else:
|
||||
# Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id)
|
||||
return str(self.tenant_id)
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
@@ -190,6 +190,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"trace_read_requests": True,
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
"image_layer_creation_check_threshold": 1,
|
||||
"switch_to_aux_file_v2": True,
|
||||
}
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -14,8 +16,6 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 2,
|
||||
# INC-186: remove when merging the fix
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
|
||||
@@ -91,3 +91,102 @@ page_cache_size=10
|
||||
# was chosen empirically for this workload.
|
||||
assert non_vectored_average < 8
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
# Stripe sizes in number of pages.
|
||||
TINY_STRIPES = 16
|
||||
LARGE_STRIPES = 32768
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"shard_count,stripe_size", [(None, None), (4, TINY_STRIPES), (4, LARGE_STRIPES)]
|
||||
)
|
||||
def test_sharding_compaction(
|
||||
neon_env_builder: NeonEnvBuilder, stripe_size: int, shard_count: Optional[int]
|
||||
):
|
||||
"""
|
||||
Use small stripes, small layers, and small compaction thresholds to exercise how compaction
|
||||
and image layer generation interacts with sharding.
|
||||
|
||||
We are looking for bugs that might emerge from the way sharding uses sparse layer files that
|
||||
only contain some of the keys in the key range covered by the layer, such as errors estimating
|
||||
the size of layers that might result in too-small layer files.
|
||||
"""
|
||||
|
||||
compaction_target_size = 128 * 1024
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{compaction_target_size}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly: we want to exercise image layer creation in this test.
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
neon_env_builder.num_pageservers = 1 if shard_count is None else shard_count
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF,
|
||||
initial_tenant_shard_count=shard_count,
|
||||
initial_tenant_shard_stripe_size=stripe_size,
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
for _i in range(0, 10):
|
||||
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
|
||||
# these should result in image layers each time we write some data into a shard, and also shards
|
||||
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
|
||||
# rather than asserting)
|
||||
workload.churn_rows(64)
|
||||
|
||||
# Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes
|
||||
# to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job.
|
||||
shard_has_image_layers = []
|
||||
for shard in env.storage_controller.locate(tenant_id):
|
||||
pageserver = env.get_pageserver(shard["node_id"])
|
||||
shard_id = shard["shard_id"]
|
||||
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
|
||||
image_layer_sizes = {}
|
||||
for layer in layer_map.historic_layers:
|
||||
if layer.kind == "Image":
|
||||
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
|
||||
|
||||
# Pageserver should assert rather than emit an empty layer file, but double check here
|
||||
assert layer.layer_file_size is not None
|
||||
assert layer.layer_file_size > 0
|
||||
|
||||
shard_has_image_layers.append(len(image_layer_sizes) > 1)
|
||||
log.info(f"Shard {shard_id} image layer sizes: {json.dumps(image_layer_sizes, indent=2)}")
|
||||
|
||||
if stripe_size == TINY_STRIPES:
|
||||
# Checking the average size validates that our keyspace partitioning is properly respecting sharding: if
|
||||
# it was not, we would tend to get undersized layers because the partitioning would overestimate the physical
|
||||
# data in a keyrange.
|
||||
#
|
||||
# We only do this check with tiny stripes, because large stripes may not give all shards enough
|
||||
# data to have statistically significant image layers
|
||||
avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes) # type: ignore
|
||||
log.info(f"Shard {shard_id} average image layer size: {avg_size}")
|
||||
assert avg_size > compaction_target_size / 2
|
||||
|
||||
if stripe_size == TINY_STRIPES:
|
||||
# Expect writes were scattered across all pageservers: they should all have compacted some image layers
|
||||
assert all(shard_has_image_layers)
|
||||
else:
|
||||
# With large stripes, it is expected that most of our writes went to one pageserver, so we just require
|
||||
# that at least one of them has some image layers.
|
||||
assert any(shard_has_image_layers)
|
||||
|
||||
# Assert that everything is still readable
|
||||
workload.validate()
|
||||
|
||||
@@ -228,8 +228,9 @@ def test_forward_compatibility(
|
||||
try:
|
||||
# Previous version neon_local and pageserver are not aware
|
||||
# of the new config.
|
||||
# TODO: remove this once the code reaches main
|
||||
# TODO: remove these once the previous version of neon local supports them
|
||||
neon_env_builder.pageserver_get_impl = None
|
||||
neon_env_builder.pageserver_validate_vectored_get = None
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_local_binpath = neon_env_builder.neon_binpath
|
||||
|
||||
111
test_runner/regress/test_s3_scrubber.py
Normal file
111
test_runner/regress/test_s3_scrubber.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import os
|
||||
import shutil
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
S3Scrubber,
|
||||
)
|
||||
from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.types import TenantShardId
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
|
||||
"""
|
||||
Test the `tenant-snapshot` subcommand, which grabs data from remote storage
|
||||
|
||||
This is only a support/debug tool, but worth testing to ensure the tool does not regress.
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.num_pageservers = shard_count if shard_count is not None else 1
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
branch = "main"
|
||||
|
||||
# Do some work
|
||||
workload = Workload(env, tenant_id, timeline_id, branch)
|
||||
workload.init()
|
||||
|
||||
# Multiple write/flush passes to generate multiple layers
|
||||
for _n in range(0, 3):
|
||||
workload.write_rows(128)
|
||||
|
||||
# Do some more work after a restart, so that we have multiple generations
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
for _n in range(0, 3):
|
||||
workload.write_rows(128)
|
||||
|
||||
# If we're doing multiple shards, split: this is important to exercise
|
||||
# the scrubber's ability to understand the references from child shards to parent shard's layers
|
||||
if shard_count is not None:
|
||||
tenant_shard_ids = env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=shard_count
|
||||
)
|
||||
|
||||
# Write after shard split: this will result in shards containing a mixture of owned
|
||||
# and parent layers in their index.
|
||||
workload.write_rows(128)
|
||||
else:
|
||||
tenant_shard_ids = [TenantShardId(tenant_id, 0, 0)]
|
||||
|
||||
output_path = neon_env_builder.test_output_dir / "snapshot"
|
||||
os.makedirs(output_path)
|
||||
|
||||
scrubber = S3Scrubber(neon_env_builder)
|
||||
scrubber.tenant_snapshot(tenant_id, output_path)
|
||||
|
||||
assert len(os.listdir(output_path)) > 0
|
||||
|
||||
workload.stop()
|
||||
|
||||
# Stop pageservers
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
|
||||
# Drop all shards' local storage
|
||||
for tenant_shard_id in tenant_shard_ids:
|
||||
pageserver = env.get_tenant_pageserver(tenant_shard_id)
|
||||
shutil.rmtree(pageserver.timeline_dir(tenant_shard_id, timeline_id))
|
||||
|
||||
# Replace remote storage contents with the snapshot we downloaded
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
|
||||
remote_tenant_path = env.pageserver_remote_storage.tenant_path(tenant_id)
|
||||
|
||||
# Delete current remote storage contents
|
||||
bucket = env.pageserver_remote_storage.bucket_name
|
||||
remote_client = env.pageserver_remote_storage.client
|
||||
deleted = 0
|
||||
for object in remote_client.list_objects_v2(Bucket=bucket, Prefix=remote_tenant_path)[
|
||||
"Contents"
|
||||
]:
|
||||
key = object["Key"]
|
||||
remote_client.delete_object(Key=key, Bucket=bucket)
|
||||
deleted += 1
|
||||
assert deleted > 0
|
||||
|
||||
# Upload from snapshot
|
||||
for root, _dirs, files in os.walk(output_path):
|
||||
for file in files:
|
||||
full_local_path = os.path.join(root, file)
|
||||
full_remote_path = (
|
||||
env.pageserver_remote_storage.tenants_path()
|
||||
+ "/"
|
||||
+ full_local_path.removeprefix(f"{output_path}/")
|
||||
)
|
||||
remote_client.upload_file(full_local_path, bucket, full_remote_path)
|
||||
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.start()
|
||||
|
||||
# Check we can read everything
|
||||
workload.validate()
|
||||
@@ -26,6 +26,7 @@ from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -1256,3 +1257,85 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
# Quiesce any background reconciliation before doing consistency check
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage", [RemoteStorageKind.LOCAL_FS, s3_storage()])
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_storage):
|
||||
"""
|
||||
Tenant import is a support/debug tool for recovering a tenant from remote storage
|
||||
if we don't have any metadata for it in the storage controller.
|
||||
"""
|
||||
|
||||
# This test is parametrized on remote storage because it exercises the relatively rare
|
||||
# code path of listing with a prefix that is not a directory name: this helps us notice
|
||||
# quickly if local_fs or s3_bucket implementations diverge.
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage)
|
||||
|
||||
# Use multiple pageservers because some test helpers assume single sharded tenants
|
||||
# if there is only one pageserver.
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# Create a second timeline to ensure that import finds both
|
||||
timeline_a = env.initial_timeline
|
||||
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
|
||||
|
||||
workload_a = Workload(env, tenant_id, timeline_a, branch_name="main")
|
||||
workload_a.init()
|
||||
|
||||
workload_b = Workload(env, tenant_id, timeline_b, branch_name="branch_b")
|
||||
workload_b.init()
|
||||
|
||||
# Write some data
|
||||
workload_a.write_rows(72)
|
||||
expect_rows_a = workload_a.expect_rows
|
||||
workload_a.stop()
|
||||
del workload_a
|
||||
|
||||
# Bump generation to make sure generation recovery works properly
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
# Write some data in the higher generation into the other branch
|
||||
workload_b.write_rows(107)
|
||||
expect_rows_b = workload_b.expect_rows
|
||||
workload_b.stop()
|
||||
del workload_b
|
||||
|
||||
# Detach from pageservers
|
||||
env.storage_controller.tenant_policy_update(
|
||||
tenant_id,
|
||||
{
|
||||
"placement": "Detached",
|
||||
},
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
|
||||
# Force-drop it from the storage controller
|
||||
env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.storage_controller_api}/debug/v1/tenant/{tenant_id}/drop",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
# Now import it again
|
||||
env.neon_cli.import_tenant(tenant_id)
|
||||
|
||||
# Check we found the shards
|
||||
describe = env.storage_controller.tenant_describe(tenant_id)
|
||||
literal_shard_count = 1 if shard_count is None else shard_count
|
||||
assert len(describe["shards"]) == literal_shard_count
|
||||
|
||||
# Check the data is still there: this implicitly proves that we recovered generation numbers
|
||||
# properly, for the timeline which was written to after a generation bump.
|
||||
for timeline, branch, expect_rows in [
|
||||
(timeline_a, "main", expect_rows_a),
|
||||
(timeline_b, "branch_1", expect_rows_b),
|
||||
]:
|
||||
workload = Workload(env, tenant_id, timeline, branch_name=branch)
|
||||
workload.expect_rows = expect_rows
|
||||
workload.validate()
|
||||
|
||||
@@ -132,7 +132,7 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str):
|
||||
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
|
||||
|
||||
# Check that we had to retry the downloads
|
||||
assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*list identifiers.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
|
||||
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ commands:
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml'
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml -web.listen-address=:9399'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
files:
|
||||
|
||||
Reference in New Issue
Block a user