mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Compare commits
5 Commits
jcsp/delet
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2a6bfc85c | ||
|
|
f3ae4eabdc | ||
|
|
38b360c935 | ||
|
|
c1069f1bd5 | ||
|
|
e54ca7e30a |
@@ -1,17 +1,3 @@
|
||||
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
|
||||
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
|
||||
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
|
||||
#
|
||||
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
|
||||
#
|
||||
[profile.dev.package."*"]
|
||||
# Set the default for dependencies in Development mode.
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev]
|
||||
# Turn on a small amount of optimization in Development mode.
|
||||
opt-level = 1
|
||||
|
||||
[build]
|
||||
# This is only present for local builds, as it will be overridden
|
||||
# by the RUSTDOCFLAGS env var in CI.
|
||||
|
||||
5
.github/ISSUE_TEMPLATE/epic-template.md
vendored
5
.github/ISSUE_TEMPLATE/epic-template.md
vendored
@@ -17,9 +17,8 @@ assignees: ''
|
||||
## Implementation ideas
|
||||
|
||||
|
||||
```[tasklist]
|
||||
### Tasks
|
||||
```
|
||||
## Tasks
|
||||
- [ ]
|
||||
|
||||
|
||||
## Other related tasks and Epics
|
||||
|
||||
31
Cargo.lock
generated
31
Cargo.lock
generated
@@ -2927,6 +2927,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -3193,6 +3203,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
@@ -3278,10 +3294,12 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"walkdir",
|
||||
@@ -3556,7 +3574,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -3569,7 +3587,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
@@ -3580,7 +3598,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -3598,7 +3616,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4064,7 +4082,6 @@ dependencies = [
|
||||
"aws-config",
|
||||
"aws-credential-types",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
"aws-types",
|
||||
"azure_core",
|
||||
@@ -5415,7 +5432,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#ef8559b5f60f5c1d2b0184a62f49035600824518"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -5772,6 +5789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -6483,7 +6501,6 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -48,7 +48,6 @@ async-trait = "0.1"
|
||||
aws-config = { version = "0.56", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.29"
|
||||
aws-smithy-http = "0.56"
|
||||
aws-smithy-async = { version = "0.56", default-features = false, features=["rt-tokio"] }
|
||||
aws-credential-types = "0.56"
|
||||
aws-types = "0.56"
|
||||
axum = { version = "0.6.20", features = ["ws"] }
|
||||
@@ -67,7 +66,7 @@ comfy-table = "6.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
crossbeam-utils = "0.8.5"
|
||||
dashmap = { version = "5.5.0", features = ["raw-api"] }
|
||||
dashmap = "5.5.0"
|
||||
either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
@@ -164,11 +163,11 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
## Other git libraries
|
||||
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
|
||||
@@ -205,7 +204,7 @@ tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -268,6 +268,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let args = Cli::parse();
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{
|
||||
|
||||
use crate::reltag::RelTag;
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
@@ -371,6 +371,8 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
@@ -515,6 +517,8 @@ pub enum HistoricLayerInfo {
|
||||
lsn_end: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
Image {
|
||||
layer_file_name: String,
|
||||
@@ -523,6 +527,8 @@ pub enum HistoricLayerInfo {
|
||||
lsn_start: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -565,6 +571,7 @@ pub struct TimelineGcRequest {
|
||||
pub gc_horizon: Option<u64>,
|
||||
}
|
||||
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
@@ -767,6 +774,36 @@ impl PagestreamBeMessage {
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
match msg_tag {
|
||||
100 => todo!(),
|
||||
101 => todo!(),
|
||||
102 => {
|
||||
let buf = buf.get_ref();
|
||||
/* TODO use constant */
|
||||
if buf.len() == 8192 {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page: buf.clone(),
|
||||
}))
|
||||
} else {
|
||||
anyhow::bail!("invalid page size: {}", buf.len());
|
||||
}
|
||||
}
|
||||
103 => {
|
||||
let buf = buf.get_ref();
|
||||
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
|
||||
let rust_str = cstr.to_str()?;
|
||||
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: rust_str.to_owned(),
|
||||
}))
|
||||
}
|
||||
104 => todo!(),
|
||||
_ => bail!("unknown tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -832,6 +869,7 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -852,6 +890,7 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -8,7 +8,6 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
once_cell.workspace = true
|
||||
aws-smithy-async.workspace = true
|
||||
aws-smithy-http.workspace = true
|
||||
aws-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
|
||||
@@ -81,6 +81,12 @@ impl std::fmt::Display for RemotePath {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemotePath> for String {
|
||||
fn from(val: RemotePath) -> Self {
|
||||
val.0.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl RemotePath {
|
||||
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
|
||||
anyhow::ensure!(
|
||||
@@ -102,7 +108,7 @@ impl RemotePath {
|
||||
self.0.file_name()
|
||||
}
|
||||
|
||||
pub fn join(&self, segment: &Utf8Path) -> Self {
|
||||
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
|
||||
@@ -4,27 +4,23 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::borrow::Cow;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
environment::credentials::EnvironmentVariableCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider,
|
||||
meta::credentials::CredentialsProviderChain,
|
||||
provider_config::ProviderConfig,
|
||||
retry::{RetryConfigBuilder, RetryMode},
|
||||
web_identity_token::WebIdentityTokenCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
|
||||
provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
|
||||
};
|
||||
use aws_credential_types::cache::CredentialsCache;
|
||||
use aws_sdk_s3::{
|
||||
config::{AsyncSleep, Config, Region, SharedAsyncSleep},
|
||||
config::{Config, Region},
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
primitives::ByteStream,
|
||||
types::{Delete, ObjectIdentifier},
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use scopeguard::ScopeGuard;
|
||||
@@ -87,23 +83,10 @@ impl S3Bucket {
|
||||
.or_else("imds", ImdsCredentialsProvider::builder().build())
|
||||
};
|
||||
|
||||
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
// We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
|
||||
// responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
|
||||
// attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
|
||||
let mut retry_config = RetryConfigBuilder::new();
|
||||
retry_config
|
||||
.set_max_attempts(Some(1))
|
||||
.set_mode(Some(RetryMode::Adaptive));
|
||||
|
||||
let mut config_builder = Config::builder()
|
||||
.region(region)
|
||||
.credentials_cache(CredentialsCache::lazy())
|
||||
.credentials_provider(credentials_provider)
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
.retry_config(retry_config.build());
|
||||
.credentials_provider(credentials_provider);
|
||||
|
||||
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
|
||||
config_builder = config_builder
|
||||
|
||||
@@ -210,6 +210,7 @@ fn ensure_logging_ready() {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
});
|
||||
|
||||
@@ -66,9 +66,17 @@ pub enum TracingErrorLayerEnablement {
|
||||
EnableWithRustLogFilter,
|
||||
}
|
||||
|
||||
/// Where the logging should output to.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Output {
|
||||
Stdout,
|
||||
Stderr,
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
@@ -85,7 +93,12 @@ pub fn init(
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stdout);
|
||||
.with_writer(move || -> Box<dyn std::io::Write> {
|
||||
match output {
|
||||
Output::Stdout => Box::new(std::io::stdout()),
|
||||
Output::Stderr => Box::new(std::io::stderr()),
|
||||
}
|
||||
});
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
|
||||
@@ -125,9 +125,6 @@ where
|
||||
// Wake everyone with an error.
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
|
||||
// Block any future waiters from starting
|
||||
internal.shutdown = true;
|
||||
|
||||
// This will steal the entire waiters map.
|
||||
// When we drop it all waiters will be woken.
|
||||
mem::take(&mut internal.waiters)
|
||||
|
||||
@@ -82,6 +82,8 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::Utf8Path;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Subcommand;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
@@ -20,6 +22,7 @@ use pageserver::{
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
RewriteSummary {
|
||||
layer_file_path: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
new_tenant_id: Option<TenantId>,
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::RewriteSummary {
|
||||
layer_file_path,
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
macro_rules! rewrite_closure {
|
||||
($($summary_ty:tt)*) => {{
|
||||
|summary| $($summary_ty)* {
|
||||
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
|
||||
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
|
||||
..summary
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
let res = ImageLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(image_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of image layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(image_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let res = DeltaLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(delta_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of delta layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(delta_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
422
pageserver/src/bin/getpage_bench_libpq.rs
Normal file
422
pageserver/src/bin/getpage_bench_libpq.rs
Normal file
@@ -0,0 +1,422 @@
|
||||
use anyhow::Context;
|
||||
use clap::Parser;
|
||||
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Client, Uri};
|
||||
|
||||
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
|
||||
use pageserver::repository;
|
||||
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tracing::info;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::logging;
|
||||
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
struct Key(repository::Key);
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
repository::Key::from_hex(s).map(Key)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange {
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
struct RelTagBlockNo {
|
||||
rel_tag: RelTag,
|
||||
block_no: u32,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
// tenant_id: String,
|
||||
// timeline_id: String,
|
||||
#[clap(long)]
|
||||
num_tasks: usize,
|
||||
#[clap(long)]
|
||||
num_requests: usize,
|
||||
#[clap(long)]
|
||||
pick_n_tenants: Option<usize>,
|
||||
tenants: Option<Vec<TenantId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let args: &'static Args = Box::leak(Box::new(Args::parse()));
|
||||
|
||||
let client = Arc::new(mgmt_api_client::Client::new(args.mgmt_api_endpoint.into()));
|
||||
|
||||
let mut tenants: Vec<TenantId> = if let Some(tenants) = &args.tenants {
|
||||
tenants.clone()
|
||||
} else {
|
||||
client
|
||||
.list_tenants()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| ti.id)
|
||||
.collect()
|
||||
};
|
||||
let tenants = if let Some(n) = args.pick_n_tenants {
|
||||
tenants.truncate(n);
|
||||
if tenants.len() != n {
|
||||
anyhow::bail!("too few tenants: {} < {}", tenants.len(), n);
|
||||
}
|
||||
tenants
|
||||
} else {
|
||||
tenants
|
||||
};
|
||||
|
||||
let mut tenant_timelines = Vec::new();
|
||||
for tenant_id in tenants {
|
||||
tenant_timelines.extend(
|
||||
client
|
||||
.list_timelines(tenant_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| (tenant_id, ti.timeline_id)),
|
||||
);
|
||||
}
|
||||
info!("tenant_timelines:\n{:?}", tenant_timelines);
|
||||
|
||||
let stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_work_tasks = tenant_timelines.len() * args.num_tasks;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (tenant_id, timeline_id) in tenant_timelines {
|
||||
let stats = Arc::clone(&stats);
|
||||
let t = tokio::spawn(timeline(
|
||||
args,
|
||||
client.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
Arc::clone(&start_work_barrier),
|
||||
stats,
|
||||
));
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
fn timeline(
|
||||
args: &'static Args,
|
||||
mgmt_api_client: Arc<mgmt_api_client::Client>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
stats: Arc<LiveStats>,
|
||||
) -> impl Future<Output = ()> + Send + Sync {
|
||||
async move {
|
||||
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(start), is_rel_block_key(end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// weighted ranges
|
||||
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
|
||||
|
||||
let ranges = Arc::new(ranges);
|
||||
let weights = Arc::new(weights);
|
||||
|
||||
let mut tasks = Vec::<JoinHandle<()>>::new();
|
||||
|
||||
for _i in 0..args.num_tasks {
|
||||
let ranges = ranges.clone();
|
||||
let _weights = weights.clone();
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
let task = tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
async move {
|
||||
let mut getpage_client = getpage_client::Client::new(
|
||||
args.page_service_connstring.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
start_work_barrier.wait().await;
|
||||
for _i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
// XXX filter these out when we iterate the keyspace
|
||||
assert!(
|
||||
is_rel_block_key(key),
|
||||
"we filter non-relblock keys out above"
|
||||
);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we just checked");
|
||||
RelTagBlockNo { rel_tag, block_no }
|
||||
};
|
||||
getpage_client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
})
|
||||
.unwrap();
|
||||
stats.inc();
|
||||
}
|
||||
getpage_client.shutdown().await;
|
||||
}
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod mgmt_api_client {
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use hyper::{client::HttpConnector, Uri};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub(crate) struct Client {
|
||||
mgmt_api_endpoint: String,
|
||||
pub(crate) client: hyper::Client<HttpConnector, hyper::Body>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: Bytes) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
client: hyper::client::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_tenants(
|
||||
&self,
|
||||
) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
|
||||
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
|
||||
let resp = self.client.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn list_timelines(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.client.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn keyspace(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<pageserver::http::models::Partitioning> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.client.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body))
|
||||
|
||||
// let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
// let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod getpage_client {
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::RelTagBlockNo;
|
||||
|
||||
pub(crate) struct Client {
|
||||
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(
|
||||
connstring: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (client, connection) =
|
||||
tokio_postgres::connect(&connstring, postgres::NoTls).await?;
|
||||
|
||||
let conn_task_cancel = CancellationToken::new();
|
||||
let conn_task = tokio::spawn({
|
||||
let conn_task_cancel = conn_task_cancel.clone();
|
||||
async move {
|
||||
tokio::select! {
|
||||
_ = conn_task_cancel.cancelled() => { }
|
||||
res = connection => {
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
|
||||
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
copy_both: Box::pin(copy_both),
|
||||
conn_task,
|
||||
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn shutdown(mut self) {
|
||||
let _ = self.cancel_on_client_drop.take();
|
||||
self.conn_task.await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn getpage(
|
||||
&mut self,
|
||||
key: RelTagBlockNo,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamGetPageRequest {
|
||||
latest: false,
|
||||
rel: key.rel_tag,
|
||||
blkno: key.block_no,
|
||||
lsn,
|
||||
};
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next = next.unwrap().unwrap();
|
||||
|
||||
match PagestreamBeMessage::deserialize(next)? {
|
||||
PagestreamBeMessage::Exists(_) => todo!(),
|
||||
PagestreamBeMessage::Nblocks(_) => todo!(),
|
||||
PagestreamBeMessage::GetPage(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::DbSize(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -103,7 +103,11 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(conf.log_format, tracing_error_layer_enablement)?;
|
||||
logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
|
||||
@@ -261,7 +261,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
for (tenant_id, tenant_state, _gen) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -345,7 +345,7 @@ impl DeletionList {
|
||||
result.extend(
|
||||
timeline_layers
|
||||
.into_iter()
|
||||
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
|
||||
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use pageserver_api::models;
|
||||
// pub use pageserver_api::models;
|
||||
|
||||
pub mod models;
|
||||
|
||||
66
pageserver/src/http/models.rs
Normal file
66
pageserver/src/http/models.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
//! If possible, use `::pageserver_api::models` instead.
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub struct Partitioning {
|
||||
pub keys: crate::keyspace::KeySpace,
|
||||
|
||||
pub at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
@@ -352,8 +352,7 @@ paths:
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
type: integer
|
||||
description: A LSN to get the timestamp
|
||||
responses:
|
||||
"200":
|
||||
|
||||
@@ -25,7 +25,7 @@ use utils::http::endpoint::request_span;
|
||||
use utils::http::json::json_request_or_empty_body;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
use super::models::{
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
@@ -60,7 +60,7 @@ use utils::{
|
||||
};
|
||||
|
||||
// Imports only used for testing APIs
|
||||
use super::models::ConfigureFailpointsRequest;
|
||||
use pageserver_api::models::ConfigureFailpointsRequest;
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -303,7 +303,11 @@ async fn build_timeline_info(
|
||||
// we're executing this function, we will outlive the timeline on-disk state.
|
||||
info.current_logical_size_non_incremental = Some(
|
||||
timeline
|
||||
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
|
||||
.get_current_logical_size_non_incremental(
|
||||
info.last_record_lsn,
|
||||
CancellationToken::new(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
@@ -756,11 +760,12 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -789,6 +794,7 @@ async fn tenant_status(
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", %tenant_id))
|
||||
@@ -1406,69 +1412,6 @@ async fn timeline_collect_keyspace(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Partitioning {
|
||||
keys: crate::keyspace::KeySpace,
|
||||
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
async {
|
||||
@@ -1480,7 +1423,10 @@ async fn timeline_collect_keyspace(
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
crate::http::models::Partitioning { keys, at_lsn },
|
||||
)
|
||||
}
|
||||
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
|
||||
.await
|
||||
|
||||
@@ -1225,6 +1225,15 @@ pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_WAIT_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_wait_seconds",
|
||||
"Time spent waiting for access to the Postgres WAL redo process",
|
||||
redo_histogram_time_buckets!(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_RECORDS_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_records_histogram",
|
||||
@@ -1919,6 +1928,7 @@ pub fn preinitialize_metrics() {
|
||||
&READ_NUM_FS_LAYERS,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
&WAL_REDO_WAIT_TIME,
|
||||
&WAL_REDO_RECORDS_HISTOGRAM,
|
||||
&WAL_REDO_BYTES_HISTOGRAM,
|
||||
]
|
||||
|
||||
@@ -512,11 +512,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
if timeline.cancel.is_cancelled() {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
|
||||
@@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
@@ -577,6 +578,7 @@ impl Timeline {
|
||||
pub async fn get_current_logical_size_non_incremental(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -588,7 +590,7 @@ impl Timeline {
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
|
||||
if self.cancel.is_cancelled() {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
@@ -1698,6 +1700,7 @@ const AUX_FILES_KEY: Key = Key {
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
@@ -1713,7 +1716,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_rel_block_key(key: Key) -> bool {
|
||||
/// See [[key_to_rel_block]].
|
||||
pub fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
|
||||
@@ -1711,6 +1711,10 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
@@ -1841,13 +1845,7 @@ impl Tenant {
|
||||
timelines.values().for_each(|timeline| {
|
||||
let timeline = Arc::clone(timeline);
|
||||
let span = Span::current();
|
||||
js.spawn(async move {
|
||||
if freeze_and_flush {
|
||||
timeline.flush_and_shutdown().instrument(span).await
|
||||
} else {
|
||||
timeline.shutdown().instrument(span).await
|
||||
}
|
||||
});
|
||||
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
|
||||
})
|
||||
};
|
||||
tracing::info!("Waiting for timelines...");
|
||||
@@ -4733,7 +4731,7 @@ mod tests {
|
||||
// Keeps uninit mark in place
|
||||
let raw_tline = tline.raw_timeline().unwrap();
|
||||
raw_tline
|
||||
.shutdown()
|
||||
.shutdown(false)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
|
||||
.await;
|
||||
std::mem::forget(tline);
|
||||
|
||||
@@ -327,7 +327,7 @@ mod tests {
|
||||
let mut sz: u16 = rng.gen();
|
||||
// Make 50% of the arrays small
|
||||
if rng.gen() {
|
||||
sz &= 63;
|
||||
sz |= 63;
|
||||
}
|
||||
random_array(sz.into())
|
||||
})
|
||||
|
||||
@@ -566,10 +566,8 @@ pub(crate) async fn shutdown_all_tenants() {
|
||||
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
use utils::completion;
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
|
||||
let (total_in_progress, total_attached) = {
|
||||
// Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants.
|
||||
let (in_progress_ops, tenants_to_shut_down) = {
|
||||
let mut m = tenants.write().unwrap();
|
||||
match &mut *m {
|
||||
TenantsMap::Initializing => {
|
||||
@@ -579,67 +577,78 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let mut shutdown_state = HashMap::new();
|
||||
let mut total_in_progress = 0;
|
||||
let mut total_attached = 0;
|
||||
let mut in_progress_ops = Vec::new();
|
||||
let mut tenants_to_shut_down = Vec::new();
|
||||
|
||||
for (tenant_id, v) in tenants.drain() {
|
||||
for (k, v) in tenants.drain() {
|
||||
match v {
|
||||
TenantSlot::Attached(t) => {
|
||||
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
t.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", %tenant_id)),
|
||||
);
|
||||
|
||||
total_attached += 1;
|
||||
tenants_to_shut_down.push(t.clone());
|
||||
shutdown_state.insert(k, TenantSlot::Attached(t));
|
||||
}
|
||||
TenantSlot::Secondary => {
|
||||
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
|
||||
shutdown_state.insert(k, TenantSlot::Secondary);
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
// wait for their notifications to fire in this function.
|
||||
join_set.spawn(async move {
|
||||
notify.wait().await;
|
||||
});
|
||||
|
||||
total_in_progress += 1;
|
||||
in_progress_ops.push(notify);
|
||||
}
|
||||
}
|
||||
}
|
||||
*m = TenantsMap::ShuttingDown(shutdown_state);
|
||||
(total_in_progress, total_attached)
|
||||
(in_progress_ops, tenants_to_shut_down)
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
// TODO: it is possible that detach and shutdown happen at the same time. as a
|
||||
// result, during shutdown we do not wait for detach.
|
||||
error!("already shutting down, this function isn't supposed to be called more than once");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
|
||||
total_in_progress, total_attached
|
||||
in_progress_ops.len(),
|
||||
tenants_to_shut_down.len()
|
||||
);
|
||||
|
||||
for barrier in in_progress_ops {
|
||||
barrier.wait().await;
|
||||
}
|
||||
|
||||
info!(
|
||||
"InProgress tenants shut down, waiting for {} Attached tenants to shut down",
|
||||
tenants_to_shut_down.len()
|
||||
);
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut join_set = JoinSet::new();
|
||||
for tenant in tenants_to_shut_down {
|
||||
let tenant_id = tenant.get_tenant_id();
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
tenant.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", %tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
let total = join_set.len();
|
||||
let mut panicked = 0;
|
||||
let mut buffering = true;
|
||||
@@ -652,7 +661,7 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
match joined {
|
||||
Ok(()) => {}
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the tasks");
|
||||
unreachable!("we are not cancelling any of the futures");
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
@@ -1285,7 +1294,8 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1293,7 +1303,9 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Attached(tenant) => {
|
||||
Some((*id, tenant.current_state(), tenant.generation()))
|
||||
}
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
@@ -349,6 +350,10 @@ async fn fill_logical_sizes(
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
// be sure to cancel all spawned tasks if we are dropped
|
||||
let _dg = cancel.clone().drop_guard();
|
||||
|
||||
// For each point that would benefit from having a logical size available,
|
||||
// spawn a Task to fetch it, unless we have it cached already.
|
||||
for seg in segments.iter() {
|
||||
@@ -366,8 +371,15 @@ async fn fill_logical_sizes(
|
||||
let parallel_size_calcs = Arc::clone(limit);
|
||||
let ctx = ctx.attached_child();
|
||||
joinset.spawn(
|
||||
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
|
||||
.in_current_span(),
|
||||
calculate_logical_size(
|
||||
parallel_size_calcs,
|
||||
timeline,
|
||||
lsn,
|
||||
cause,
|
||||
ctx,
|
||||
cancel.child_token(),
|
||||
)
|
||||
.in_current_span(),
|
||||
);
|
||||
}
|
||||
e.insert(cached_size);
|
||||
@@ -475,13 +487,14 @@ async fn calculate_logical_size(
|
||||
lsn: utils::lsn::Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
|
||||
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
pub mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
pub index_start_blk: u32,
|
||||
@@ -609,6 +609,62 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != DELTA_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in DeltaLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
|
||||
/// the 'index' starts at the block indicated by 'index_start_blk'
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(super) struct Summary {
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn: Lsn,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
index_start_blk: u32,
|
||||
pub index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
index_root_blk: u32,
|
||||
pub index_root_blk: u32,
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
@@ -294,6 +294,61 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != IMAGE_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use remote_storage::RemotePath;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -251,7 +252,6 @@ impl Layer {
|
||||
|
||||
layer
|
||||
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::info_span!("get_value_reconstruct_data", layer=%self))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -305,6 +305,12 @@ impl Layer {
|
||||
&self.0.path
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn remote_path(&self) -> Option<RemotePath> {
|
||||
self.0.remote_path()
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
self.0.metadata()
|
||||
}
|
||||
@@ -915,6 +921,17 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
fn remote_path(&self) -> Option<RemotePath> {
|
||||
let tl = self.timeline.upgrade()?; // TODO: should distinguish this case, but, accuracy doesn't matter for this field.
|
||||
Some(crate::tenant::remote_timeline_client::remote_layer_path(
|
||||
&tl.tenant_id,
|
||||
&tl.timeline_id,
|
||||
&self.desc.filename(),
|
||||
self.generation,
|
||||
))
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.desc.filename().file_name();
|
||||
|
||||
@@ -934,6 +951,7 @@ impl LayerInner {
|
||||
lsn_end: lsn_range.end,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
} else {
|
||||
let lsn = self.desc.image_layer_lsn();
|
||||
@@ -944,6 +962,7 @@ impl LayerInner {
|
||||
lsn_start: lsn,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1212,10 +1231,8 @@ impl DownloadedLayer {
|
||||
// this will be a permanent failure
|
||||
.context("load layer");
|
||||
|
||||
if let Err(e) = res.as_ref() {
|
||||
if res.is_err() {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
|
||||
tracing::error!("layer loading failed permanently: {e:#}");
|
||||
}
|
||||
res
|
||||
};
|
||||
@@ -1294,7 +1311,6 @@ impl ResidentLayer {
|
||||
}
|
||||
|
||||
/// Loads all keys stored in the layer. Returns key, lsn and value size.
|
||||
#[tracing::instrument(skip_all, fields(layer=%self))]
|
||||
pub(crate) async fn load_keys<'a>(
|
||||
&'a self,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -36,6 +36,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
@@ -49,7 +50,6 @@ use crate::tenant::{
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
@@ -247,7 +247,7 @@ pub struct Timeline {
|
||||
/// the flush finishes. You can use that to wait for the flush to finish.
|
||||
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
|
||||
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
@@ -374,19 +374,6 @@ pub enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
@@ -904,16 +891,15 @@ impl Timeline {
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O.
|
||||
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Stop ingesting data, so that we are not still writing to an InMemoryLayer while
|
||||
// trying to flush
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// prevent writes to the InMemoryLayer
|
||||
tracing::debug!("Waiting for WalReceiverManager...");
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
@@ -922,70 +908,40 @@ impl Timeline {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// now all writers to InMemory layer are gone, do the final flush if requested
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(_) => {
|
||||
// drain the upload queue
|
||||
if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
if let Err(e) = client.wait_completion().await {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to flush to remote storage: {e:#}");
|
||||
}
|
||||
if freeze_and_flush {
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
return; // TODO: should probably drain remote timeline client anyways?
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
|
||||
// drain the upload queue
|
||||
let res = if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
client.wait_completion().await
|
||||
} else {
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
warn!("failed to await for frozen and flushed uploads: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
self.shutdown().await;
|
||||
}
|
||||
|
||||
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
|
||||
/// the graceful [`Timeline::flush_and_shutdown`] function.
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
|
||||
// while doing so.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// Shut down the layer flush task before the remote client, as one depends on the other
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::LayerFlushTask),
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
|
||||
// case our caller wants to use that for a deletion
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
match remote_client.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
// Shutting down during initialization is legal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
@@ -1029,12 +985,7 @@ impl Timeline {
|
||||
reason,
|
||||
backtrace: backtrace_str,
|
||||
};
|
||||
self.set_state(broken_state);
|
||||
|
||||
// Although the Broken state is not equivalent to shutdown() (shutdown will be called
|
||||
// later when this tenant is detach or the process shuts down), firing the cancellation token
|
||||
// here avoids the need for other tasks to watch for the Broken state explicitly.
|
||||
self.cancel.cancel();
|
||||
self.set_state(broken_state)
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TimelineState {
|
||||
@@ -1790,8 +1741,12 @@ impl Timeline {
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
|
||||
// no extra cancellation here, because nothing really waits for this to complete compared
|
||||
// to spawn_ondemand_logical_size_calculation.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
@@ -1860,6 +1815,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
@@ -1880,7 +1836,7 @@ impl Timeline {
|
||||
false,
|
||||
async move {
|
||||
let res = self_clone
|
||||
.logical_size_calculation_task(lsn, cause, &ctx)
|
||||
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
|
||||
.await;
|
||||
let _ = sender.send(res).ok();
|
||||
Ok(()) // Receiver is responsible for handling errors
|
||||
@@ -1896,28 +1852,58 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let _guard = self.gate.enter();
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
let mut calculation = pin!(async {
|
||||
let cancel = cancel.child_token();
|
||||
let ctx = ctx.attached_child();
|
||||
self_calculation
|
||||
.calculate_logical_size(lsn, cause, &ctx)
|
||||
.calculate_logical_size(lsn, cause, cancel, &ctx)
|
||||
.await
|
||||
});
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
match timeline_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = timeline_state_updates.borrow().clone();
|
||||
match new_state {
|
||||
// we're running this job for active timelines only
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken { .. }
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Loading => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
// can't happen, the sender is not dropped as long as the Timeline exists
|
||||
break "aborted because state watch was dropped".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let taskmgr_shutdown_cancellation = async {
|
||||
task_mgr::shutdown_watcher().await;
|
||||
"aborted because task_mgr shutdown requested".to_string()
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
res = &mut calculation => { res }
|
||||
_ = self.cancel.cancelled() => {
|
||||
debug!("cancelling logical size calculation for timeline shutdown");
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
calculation.await
|
||||
}
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
debug!("cancelling logical size calculation for task shutdown");
|
||||
reason = taskmgr_shutdown_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
calculation.await
|
||||
}
|
||||
}
|
||||
@@ -1931,6 +1917,7 @@ impl Timeline {
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
@@ -1973,7 +1960,7 @@ impl Timeline {
|
||||
};
|
||||
let timer = storage_time_metrics.start_timer();
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
@@ -2386,10 +2373,6 @@ impl Timeline {
|
||||
info!("started flush loop");
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
},
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
@@ -2401,14 +2384,6 @@ impl Timeline {
|
||||
let timer = self.metrics.flush_time_histo.start_timer();
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
|
||||
// anyone waiting on that will respect self.cancel as well: they will stop
|
||||
// waiting at the same time we as drop out of this loop.
|
||||
return;
|
||||
}
|
||||
|
||||
let layer_to_flush = {
|
||||
let guard = self.layers.read().await;
|
||||
guard.layer_map().frozen_layers.front().cloned()
|
||||
@@ -2417,18 +2392,9 @@ impl Timeline {
|
||||
let Some(layer_to_flush) = layer_to_flush else {
|
||||
break Ok(());
|
||||
};
|
||||
match self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(FlushLayerError::Cancelled) => {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
|
||||
) => {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break err;
|
||||
}
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
}
|
||||
};
|
||||
// Notify any listeners that we're done
|
||||
@@ -2477,17 +2443,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
trace!("waiting for flush to complete");
|
||||
tokio::select! {
|
||||
rx_e = rx.changed() => {
|
||||
rx_e?;
|
||||
},
|
||||
// Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
|
||||
// the notification from [`flush_loop`] that it completed.
|
||||
_ = self.cancel.cancelled() => {
|
||||
tracing::info!("Cancelled layer flush due on timeline shutdown");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
rx.changed().await?;
|
||||
trace!("done")
|
||||
}
|
||||
}
|
||||
@@ -2502,7 +2458,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), FlushLayerError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -2527,11 +2483,6 @@ impl Timeline {
|
||||
let (partitioning, _lsn) = self
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
(
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
@@ -2563,10 +2514,6 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
@@ -2576,10 +2523,6 @@ impl Timeline {
|
||||
let metadata = {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
|
||||
@@ -326,7 +326,8 @@ impl Timeline {
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
@@ -366,12 +367,21 @@ impl Timeline {
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||
async fn imitate_timeline_cached_layer_accesses(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let lsn = self.get_last_record_lsn();
|
||||
|
||||
// imitiate on-restart initial logical size
|
||||
let size = self
|
||||
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
|
||||
.calculate_logical_size(
|
||||
lsn,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
WAL_REDO_WAIT_TIME,
|
||||
};
|
||||
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
|
||||
use crate::repository::Key;
|
||||
@@ -206,8 +207,11 @@ impl PostgresRedoManager {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let start_time = Instant::now();
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
let proc: Arc<WalRedoProcess> = {
|
||||
let proc_guard = self.redo_process.read().unwrap();
|
||||
@@ -232,7 +236,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
@@ -240,7 +244,8 @@ impl PostgresRedoManager {
|
||||
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.context("apply_wal_records");
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
|
||||
let len = records.len();
|
||||
let nbytes = records.iter().fold(0, |acumulator, record| {
|
||||
|
||||
@@ -80,9 +80,6 @@ struct ProxyCliArgs {
|
||||
/// cache for `wake_compute` api method (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO)]
|
||||
wake_compute_cache: String,
|
||||
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
|
||||
wake_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
@@ -223,23 +220,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
|
||||
}));
|
||||
|
||||
let config::WakeComputeLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(
|
||||
console::locks::ApiLocks::new("wake_compute_lock", permits, shards, timeout)
|
||||
.unwrap(),
|
||||
));
|
||||
tokio::spawn(locks.garbage_collect_worker(epoch));
|
||||
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let endpoint = http::Endpoint::new(url, http::new_client());
|
||||
|
||||
let api = console::provider::neon::Api::new(endpoint, caches, locks);
|
||||
let api = console::provider::neon::Api::new(endpoint, caches);
|
||||
auth::BackendType::Console(Cow::Owned(api), ())
|
||||
}
|
||||
AuthBackend::Postgres => {
|
||||
|
||||
@@ -264,79 +264,6 @@ impl FromStr for CacheOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
pub struct WakeComputeLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
pub permits: usize,
|
||||
/// Garbage collection epoch
|
||||
pub epoch: Duration,
|
||||
/// Lock timeout
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl WakeComputeLockOptions {
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
|
||||
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
|
||||
|
||||
/// Parse lock options passed via cmdline.
|
||||
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let mut shards = None;
|
||||
let mut permits = None;
|
||||
let mut epoch = None;
|
||||
let mut timeout = None;
|
||||
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
match key {
|
||||
"shards" => shards = Some(value.parse()?),
|
||||
"permits" => permits = Some(value.parse()?),
|
||||
"epoch" => epoch = Some(humantime::parse_duration(value)?),
|
||||
"timeout" => timeout = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
|
||||
// these dont matter if lock is disabled
|
||||
if let Some(0) = permits {
|
||||
timeout = Some(Duration::default());
|
||||
epoch = Some(Duration::default());
|
||||
shards = Some(2);
|
||||
}
|
||||
|
||||
let out = Self {
|
||||
shards: shards.context("missing `shards`")?,
|
||||
permits: permits.context("missing `permits`")?,
|
||||
epoch: epoch.context("missing `epoch`")?,
|
||||
timeout: timeout.context("missing `timeout`")?,
|
||||
};
|
||||
|
||||
ensure!(out.shards > 1, "shard count must be > 1");
|
||||
ensure!(
|
||||
out.shards.is_power_of_two(),
|
||||
"shard count must be a power of two"
|
||||
);
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for WakeComputeLockOptions {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(options: &str) -> Result<Self, Self::Err> {
|
||||
let error = || format!("failed to parse cache lock options '{options}'");
|
||||
Self::parse(options).with_context(error)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -361,42 +288,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_lock_options() -> anyhow::Result<()> {
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(permits, 4);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(60));
|
||||
assert_eq!(timeout, Duration::from_millis(100));
|
||||
assert_eq!(shards, 16);
|
||||
assert_eq!(permits, 8);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "permits=0".parse()?;
|
||||
assert_eq!(epoch, Duration::ZERO);
|
||||
assert_eq!(timeout, Duration::ZERO);
|
||||
assert_eq!(shards, 2);
|
||||
assert_eq!(permits, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,10 +13,5 @@ pub mod caches {
|
||||
pub use super::provider::{ApiCaches, NodeInfoCache};
|
||||
}
|
||||
|
||||
/// Various cache-related types.
|
||||
pub mod locks {
|
||||
pub use super::provider::ApiLocks;
|
||||
}
|
||||
|
||||
/// Console's management API.
|
||||
pub mod mgmt;
|
||||
|
||||
@@ -8,13 +8,7 @@ use crate::{
|
||||
compute, scram,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::{OwnedSemaphorePermit, Semaphore},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::info;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub mod errors {
|
||||
use crate::{
|
||||
@@ -155,9 +149,6 @@ pub mod errors {
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ApiError),
|
||||
|
||||
#[error("Timeout waiting to acquire wake compute lock")]
|
||||
TimeoutError,
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
@@ -167,17 +158,6 @@ pub mod errors {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::AcquireError> for WakeComputeError {
|
||||
fn from(_: tokio::sync::AcquireError) -> Self {
|
||||
WakeComputeError::TimeoutError
|
||||
}
|
||||
}
|
||||
impl From<tokio::time::error::Elapsed> for WakeComputeError {
|
||||
fn from(_: tokio::time::error::Elapsed) -> Self {
|
||||
WakeComputeError::TimeoutError
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for WakeComputeError {
|
||||
fn to_string_client(&self) -> String {
|
||||
use WakeComputeError::*;
|
||||
@@ -187,8 +167,6 @@ pub mod errors {
|
||||
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
ApiError(e) => e.to_string_client(),
|
||||
|
||||
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -255,145 +233,3 @@ pub struct ApiCaches {
|
||||
/// Cache for the `wake_compute` API method.
|
||||
pub node_info: NodeInfoCache,
|
||||
}
|
||||
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<Arc<str>, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
timeout: Duration,
|
||||
registered: prometheus::IntCounter,
|
||||
unregistered: prometheus::IntCounter,
|
||||
reclamation_lag: prometheus::Histogram,
|
||||
lock_acquire_lag: prometheus::Histogram,
|
||||
}
|
||||
|
||||
impl ApiLocks {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
permits: usize,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
) -> prometheus::Result<Self> {
|
||||
let registered = prometheus::IntCounter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"semaphores_registered",
|
||||
"Number of semaphores registered in this api lock",
|
||||
)
|
||||
.namespace(name),
|
||||
)?;
|
||||
prometheus::register(Box::new(registered.clone()))?;
|
||||
let unregistered = prometheus::IntCounter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"semaphores_unregistered",
|
||||
"Number of semaphores unregistered in this api lock",
|
||||
)
|
||||
.namespace(name),
|
||||
)?;
|
||||
prometheus::register(Box::new(unregistered.clone()))?;
|
||||
let reclamation_lag = prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"reclamation_lag_seconds",
|
||||
"Time it takes to reclaim unused semaphores in the api lock",
|
||||
)
|
||||
.namespace(name)
|
||||
// 1us -> 65ms
|
||||
// benchmarks on my mac indicate it's usually in the range of 256us and 512us
|
||||
.buckets(prometheus::exponential_buckets(1e-6, 2.0, 16)?),
|
||||
)?;
|
||||
prometheus::register(Box::new(reclamation_lag.clone()))?;
|
||||
let lock_acquire_lag = prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"semaphore_acquire_seconds",
|
||||
"Time it takes to reclaim unused semaphores in the api lock",
|
||||
)
|
||||
.namespace(name)
|
||||
// 0.1ms -> 6s
|
||||
.buckets(prometheus::exponential_buckets(1e-4, 2.0, 16)?),
|
||||
)?;
|
||||
prometheus::register(Box::new(lock_acquire_lag.clone()))?;
|
||||
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
permits,
|
||||
timeout,
|
||||
lock_acquire_lag,
|
||||
registered,
|
||||
unregistered,
|
||||
reclamation_lag,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_wake_compute_permit(
|
||||
&self,
|
||||
key: &Arc<str>,
|
||||
) -> Result<WakeComputePermit, errors::WakeComputeError> {
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
// get fast path
|
||||
if let Some(semaphore) = self.node_locks.get(key) {
|
||||
semaphore.clone()
|
||||
} else {
|
||||
self.node_locks
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.registered.inc();
|
||||
Arc::new(Semaphore::new(self.permits))
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
|
||||
|
||||
self.lock_acquire_lag
|
||||
.observe((Instant::now() - now).as_secs_f64());
|
||||
|
||||
Ok(WakeComputePermit {
|
||||
permit: Some(permit??),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self, epoch: std::time::Duration) {
|
||||
if self.permits == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut interval = tokio::time::interval(epoch / (self.node_locks.shards().len()) as u32);
|
||||
loop {
|
||||
for (i, shard) in self.node_locks.shards().iter().enumerate() {
|
||||
interval.tick().await;
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
// therefore releasing it is safe from race conditions
|
||||
info!(
|
||||
name = self.name,
|
||||
shard = i,
|
||||
"performing epoch reclamation on api lock"
|
||||
);
|
||||
let mut lock = shard.write();
|
||||
let timer = self.reclamation_lag.start_timer();
|
||||
let count = lock
|
||||
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
|
||||
.count();
|
||||
drop(lock);
|
||||
self.unregistered.inc_by(count as u64);
|
||||
timer.observe_duration()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WakeComputePermit {
|
||||
// None if the lock is disabled
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub fn should_check_cache(&self) -> bool {
|
||||
self.permit.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
use super::{
|
||||
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
ApiCaches, ApiLocks, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
ApiCaches, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
};
|
||||
use crate::{auth::ClientCredentials, compute, http, scram};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
@@ -17,17 +17,12 @@ use tracing::{error, info, info_span, warn, Instrument};
|
||||
pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
jwt: String,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
/// Construct an API object containing the auth parameters.
|
||||
pub fn new(
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
) -> Self {
|
||||
pub fn new(endpoint: http::Endpoint, caches: &'static ApiCaches) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
@@ -35,7 +30,6 @@ impl Api {
|
||||
Self {
|
||||
endpoint,
|
||||
caches,
|
||||
locks,
|
||||
jwt,
|
||||
}
|
||||
}
|
||||
@@ -169,22 +163,9 @@ impl super::Api for Api {
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
let key: Arc<str> = key.into();
|
||||
|
||||
let permit = self.locks.get_wake_compute_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
if permit.should_check_cache() {
|
||||
if let Some(cached) = self.caches.node_info.get(&key) {
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
return Ok(cached);
|
||||
}
|
||||
}
|
||||
|
||||
let node = self.do_wake_compute(extra, creds).await?;
|
||||
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
|
||||
info!(key = &*key, "created a cache entry for compute node info");
|
||||
let (_, cached) = self.caches.node_info.insert(key.into(), node);
|
||||
info!(key = key, "created a cache entry for compute node info");
|
||||
|
||||
Ok(cached)
|
||||
}
|
||||
|
||||
@@ -570,7 +570,6 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
@@ -202,6 +202,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
@@ -434,6 +434,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
69
test_runner/duplicate_tenant.py
Normal file
69
test_runner/duplicate_tenant.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
|
||||
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
|
||||
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
|
||||
parser.add_argument("--ncopies", type=int, help="Number of copies")
|
||||
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
|
||||
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
initial_tenant = args.initial_tenant
|
||||
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
|
||||
ncopies = args.ncopies
|
||||
numthreads = args.numthreads
|
||||
|
||||
new_tenant = TenantId.generate()
|
||||
print(f"New tenant: {new_tenant}")
|
||||
|
||||
client = PageserverHttpClient(args.port, lambda: None)
|
||||
|
||||
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
|
||||
|
||||
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
|
||||
|
||||
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
"./target/debug/pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
client.tenant_attach(new_tenant, generation=src_tenant_gen)
|
||||
|
||||
while True:
|
||||
status = client.tenant_status(new_tenant)
|
||||
if status["state"]["slug"] == "Active":
|
||||
break
|
||||
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
|
||||
time.sleep(1)
|
||||
|
||||
print("Tenant is active: " + str(new_tenant))
|
||||
@@ -724,13 +724,10 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
if config.enable_generations:
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
self.enable_generations()
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
toml = textwrap.dedent(
|
||||
@@ -819,6 +816,18 @@ class NeonEnv:
|
||||
log.info(f"Config: {toml}")
|
||||
self.neon_cli.init(toml)
|
||||
|
||||
def enable_generations(self, start=False):
|
||||
if not start:
|
||||
# TODO: assert that we haven't `self.start()`ed yet
|
||||
pass
|
||||
assert self.control_plane_api == None
|
||||
assert self.attachment_service == None
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
if start:
|
||||
self.attachment_service.start()
|
||||
|
||||
def start(self):
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
|
||||
@@ -58,6 +58,7 @@ class HistoricLayerInfo:
|
||||
lsn_start: str
|
||||
lsn_end: Optional[str]
|
||||
remote: bool
|
||||
remote_path: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
|
||||
@@ -68,6 +69,7 @@ class HistoricLayerInfo:
|
||||
lsn_start=d["lsn_start"],
|
||||
lsn_end=d.get("lsn_end"),
|
||||
remote=d["remote"],
|
||||
remote_path=d.get("remote_path"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
107
test_runner/performance/test_pageserver.py
Normal file
107
test_runner/performance/test_pageserver.py
Normal file
@@ -0,0 +1,107 @@
|
||||
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import subprocess
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, last_flush_lsn_upload
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
|
||||
def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# clean up the useless default tenant
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period" : '0s',
|
||||
"checkpoint_timeout" : '3650 day',
|
||||
"compaction_period" : '20 s',
|
||||
"compaction_threshold" : 10,
|
||||
"compaction_target_size" : 134217728,
|
||||
"checkpoint_distance" : 268435456,
|
||||
"image_creation_threshold" : 3,
|
||||
}
|
||||
tenant_config_cli = { k: str(v) for k, v in tenant_config_mgmt_api.items() }
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
|
||||
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
|
||||
# stop PS just for good measure
|
||||
env.pageserver.stop()
|
||||
|
||||
# duplicate the tenant in remote stora
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
tenants = [template_tenant]
|
||||
|
||||
for i in range(0, 10):
|
||||
new_tenant = TenantId.generate()
|
||||
tenants.append(new_tenant)
|
||||
log.info("Duplicating tenant #%s: %s", i, new_tenant)
|
||||
|
||||
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
env.neon_binpath / "pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
|
||||
env.pageserver.start()
|
||||
assert ps_http.tenant_list() == []
|
||||
for tenant in tenants:
|
||||
ps_http.tenant_attach(tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen+1)
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
|
||||
# run the benchmark
|
||||
cmd = [
|
||||
str(env.neon_binpath / "getpage_bench_libpq"),
|
||||
"--mgmt-api-endpoint", ps_http.base_url,
|
||||
"--page-service-connstring", env.pageserver.connstr(password=None),
|
||||
"--num-tasks", "1",
|
||||
"--num-requests", "10000",
|
||||
*[str(tenant) for tenant in tenants],
|
||||
]
|
||||
basepath = pg_bin.run_capture(cmd)
|
||||
log.info("Benchmark results: %s", basepath + ".stdout")
|
||||
@@ -26,7 +26,6 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*layer loading failed permanently: load layer: .*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
dashmap = { version = "5", default-features = false, features = ["raw-api"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
|
||||
Reference in New Issue
Block a user