Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
2f7c2b9826 Skip sync safekeepers (draft) 2023-07-24 07:52:36 -04:00
41 changed files with 410 additions and 801 deletions

3
Cargo.lock generated
View File

@@ -2506,7 +2506,6 @@ dependencies = [
"pageserver",
"postgres_ffi",
"svg_fmt",
"tokio",
"utils",
"workspace_hack",
]
@@ -2545,7 +2544,6 @@ dependencies = [
"metrics",
"nix",
"num-traits",
"num_cpus",
"once_cell",
"pageserver_api",
"pin-project-lite",
@@ -4869,7 +4867,6 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-error",
"tracing-subscriber",

View File

@@ -199,8 +199,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# Use custom branch with HNSW index support
RUN wget https://github.com/pgvector/pgvector/archive/refs/heads/hnsw.tar.gz -O pgvector.tar.gz && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -223,8 +223,9 @@ fn main() -> Result<()> {
drop(state);
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;

View File

@@ -372,9 +372,13 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("starting safekeepers syncing");
let lsn = self
let lsn = if let Some(lsn) = self.check_safekeepers_synced() {
lsn
} else {
self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
};
info!("safekeepers synced at LSN {}", lsn);
lsn
}

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
@@ -41,14 +42,13 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
thread::Builder::new()
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})
.expect("cannot launch configurator thread")
})?)
}

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::{thread, time};
use anyhow::Result;
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tracing::{debug, info};
@@ -104,11 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
thread::Builder::new()
Ok(thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))
.expect("cannot launch compute monitor thread")
.spawn(move || watch_compute_activity(&state))?)
}

View File

@@ -6,7 +6,6 @@ use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};

View File

@@ -57,9 +57,9 @@ pub fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
// Multixact utils
pub fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32)
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE as u32
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE as u32) as usize
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
}
pub fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
@@ -81,41 +81,3 @@ fn mx_offset_to_member_page(xid: u32) -> u32 {
pub fn mx_offset_to_member_segment(xid: u32) -> i32 {
(mx_offset_to_member_page(xid) / pg_constants::SLRU_PAGES_PER_SEGMENT) as i32
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multixid_calc() {
// Check that the mx_offset_* functions produce the same values as the
// corresponding PostgreSQL C macros (MXOffsetTo*). These test values
// were generated by calling the PostgreSQL macros with a little C
// program.
assert_eq!(mx_offset_to_member_segment(0), 0);
assert_eq!(mx_offset_to_member_page(0), 0);
assert_eq!(mx_offset_to_flags_offset(0), 0);
assert_eq!(mx_offset_to_flags_bitshift(0), 0);
assert_eq!(mx_offset_to_member_offset(0), 4);
assert_eq!(mx_offset_to_member_segment(1), 0);
assert_eq!(mx_offset_to_member_page(1), 0);
assert_eq!(mx_offset_to_flags_offset(1), 0);
assert_eq!(mx_offset_to_flags_bitshift(1), 8);
assert_eq!(mx_offset_to_member_offset(1), 8);
assert_eq!(mx_offset_to_member_segment(123456789), 2358);
assert_eq!(mx_offset_to_member_page(123456789), 75462);
assert_eq!(mx_offset_to_flags_offset(123456789), 4780);
assert_eq!(mx_offset_to_flags_bitshift(123456789), 8);
assert_eq!(mx_offset_to_member_offset(123456789), 4788);
assert_eq!(mx_offset_to_member_segment(u32::MAX - 1), 82040);
assert_eq!(mx_offset_to_member_page(u32::MAX - 1), 2625285);
assert_eq!(mx_offset_to_flags_offset(u32::MAX - 1), 5160);
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX - 1), 16);
assert_eq!(mx_offset_to_member_offset(u32::MAX - 1), 5172);
assert_eq!(mx_offset_to_member_segment(u32::MAX), 82040);
assert_eq!(mx_offset_to_member_page(u32::MAX), 2625285);
assert_eq!(mx_offset_to_flags_offset(u32::MAX), 5160);
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX), 24);
assert_eq!(mx_offset_to_member_offset(u32::MAX), 5176);
}
}

View File

@@ -42,10 +42,6 @@ workspace_hack.workspace = true
const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -9,6 +9,7 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
@@ -147,140 +148,26 @@ impl Drop for RequestCancelled {
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
SERVE_METRICS_COUNT.inc();
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
struct ChannelWriter {
buffer: BytesMut,
tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
let body = Body::wrap_stream(ReceiverStream::new(rx));
let mut writer = ChannelWriter::new(128 * 1024, tx);
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(body)
.body(Body::from(buffer))
.unwrap();
let span = info_span!("blocking");
tokio::task::spawn_blocking(move || {
let _span = span.entered();
let metrics = metrics::gather();
let res = encoder
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
match res {
Ok(()) => {
tracing::info!(
bytes = writer.flushed_bytes(),
elapsed_ms = started_at.elapsed().as_millis(),
"responded /metrics"
);
}
Err(e) => {
tracing::warn!("failed to write out /metrics response: {e:#}");
// semantics of this error are quite... unclear. we want to error the stream out to
// abort the response to somehow notify the client that we failed.
//
// though, most likely the reason for failure is that the receiver is already gone.
drop(
writer
.tx
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
);
}
}
});
Ok(response)
}

View File

@@ -35,8 +35,6 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses
num_cpus = { version = "1.15" }
num-traits.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true

View File

@@ -13,7 +13,6 @@ clap = { workspace = true, features = ["string"] }
git-version.workspace = true
pageserver = { path = ".." }
postgres_ffi.workspace = true
tokio.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true

View File

@@ -95,7 +95,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
}
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
@@ -129,7 +129,7 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
Ok(holes)
}
pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
@@ -160,7 +160,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
parse_filename(&layer.file_name().into_string().unwrap())
{
if layer_file.is_delta {
layer_file.holes = get_holes(&layer.path(), max_holes).await?;
layer_file.holes = get_holes(&layer.path(), max_holes)?;
n_deltas += 1;
}
layers.push(layer_file);

View File

@@ -43,7 +43,8 @@ pub(crate) enum LayerCmd {
},
}
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
use pageserver::tenant::blob_io::BlobCursor;
use pageserver::tenant::block_io::BlockReader;
let path = path.as_ref();
@@ -77,7 +78,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
Ok(())
}
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join("tenants"))? {
@@ -152,7 +153,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
);
if layer_file.is_delta {
read_delta_file(layer.path()).await?;
read_delta_file(layer.path())?;
} else {
anyhow::bail!("not supported yet :(");
}

View File

@@ -72,13 +72,12 @@ struct AnalyzeLayerMapCmd {
max_holes: Option<usize>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
let cli = CliOpts::parse();
match cli.command {
Commands::Layer(cmd) => {
layers::main(&cmd).await?;
layers::main(&cmd)?;
}
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
@@ -87,7 +86,7 @@ async fn main() -> anyhow::Result<()> {
draw_timeline_dir::main()?;
}
Commands::AnalyzeLayerMap(cmd) => {
layer_map_analyzer::main(&cmd).await?;
layer_map_analyzer::main(&cmd)?;
}
Commands::PrintLayerFile(cmd) => {
if let Err(e) = read_pg_control_file(&cmd.path) {
@@ -95,7 +94,7 @@ async fn main() -> anyhow::Result<()> {
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&cmd.path).await?;
print_layerfile(&cmd.path)?;
}
}
};
@@ -114,12 +113,12 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
Ok(())
}
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(

View File

@@ -994,29 +994,31 @@ async fn timeline_gc_handler(
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
request: Request<Body>,
cancel: CancellationToken,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
.await
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let result_receiver = mgr::immediate_compact(tenant_id, timeline_id, &ctx)
.await
.context("spawn compaction task")
.map_err(ApiError::InternalServerError)?;
let result: anyhow::Result<()> = result_receiver
.await
.context("receive compaction result")
.map_err(ApiError::InternalServerError)?;
result.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
// Run checkpoint immediately on given timeline.
async fn timeline_checkpoint_handler(
request: Request<Body>,
cancel: CancellationToken,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -1029,13 +1031,13 @@ async fn timeline_checkpoint_handler(
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&cancel, &ctx)
.compact(&ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
.await
}

View File

@@ -6,6 +6,7 @@ use metrics::{
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
use strum::VariantNames;
use strum_macros::{EnumVariantNames, IntoStaticStr};
use utils::id::{TenantId, TimelineId};
@@ -83,10 +84,11 @@ pub static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_read_num_fs_layers",
"Number of persistent layers accessed for processing a read request, including those in the cache",
&["tenant_id", "timeline_id"],
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
)
.expect("failed to define a metric")
@@ -110,10 +112,11 @@ pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_getpage_get_reconstruct_data_seconds",
"Time spent in get_reconstruct_value_data",
&["tenant_id", "timeline_id"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
@@ -243,10 +246,11 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wait_lsn_seconds",
"Time spent waiting for WAL to arrive",
&["tenant_id", "timeline_id"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
@@ -305,24 +309,11 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define current logical size metric")
});
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_states_count",
"Count of tenants per state",
&["state"]
)
.expect("Failed to register pageserver_tenant_states_count metric")
});
/// A set of broken tenants.
///
/// These are expected to be so rare that a set is fine. Set as in a new timeseries per each broken
/// tenant.
pub(crate) static BROKEN_TENANTS_SET: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_broken_tenants_count",
"Set of broken tenants",
&["tenant_id"]
&["tenant_id", "state"]
)
.expect("Failed to register pageserver_tenant_states_count metric")
});
@@ -508,31 +499,23 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
30.000, // 30000 ms
];
/// Tracks time taken by fs operations near VirtualFile.
///
/// Operations:
/// - open ([`std::fs::OpenOptions::open`])
/// - close (dropping [`std::fs::File`])
/// - close-by-replace (close by replacement algorithm)
/// - read (`read_at`)
/// - write (`write_at`)
/// - seek (modify internal position or file length query)
/// - fsync ([`std::fs::File::sync_all`])
/// - metadata ([`std::fs::File::metadata`])
pub(crate) static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
"open", "close", "read", "write", "seek", "fsync", "gc", "metadata",
];
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_io_operations_seconds",
"Time spent in IO operations",
&["operation"],
&["operation", "tenant_id", "timeline_id"],
STORAGE_IO_TIME_BUCKETS.into()
)
.expect("failed to define a metric")
});
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
pub static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_io_operations_bytes_total",
"Total amount of bytes read/written in IO operations",
@@ -622,7 +605,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
at a given instant. It gives you a better idea of the queue depth \
than plotting the gauge directly, since operations may complete faster \
than the sampling interval.",
&["file_kind", "op_kind"],
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
// The calls_unfinished gauge is an integer gauge, hence we have integer buckets.
vec![0.0, 1.0, 2.0, 4.0, 6.0, 8.0, 10.0, 15.0, 20.0, 40.0, 60.0, 80.0, 100.0, 500.0],
)
@@ -679,13 +662,13 @@ impl RemoteOpFileKind {
}
}
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
pub static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_operation_seconds",
"Time spent on remote storage operations. \
Grouped by tenant, timeline, operation_kind and status. \
Does not account for time spent waiting in remote timeline client's queues.",
&["file_kind", "op_kind", "status"]
&["tenant_id", "timeline_id", "file_kind", "op_kind", "status"]
)
.expect("failed to define a metric")
});
@@ -914,6 +897,7 @@ impl StorageTimeMetrics {
pub struct TimelineMetrics {
tenant_id: String,
timeline_id: String,
pub get_reconstruct_data_time_histo: Histogram,
pub flush_time_histo: StorageTimeMetrics,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
@@ -922,7 +906,9 @@ pub struct TimelineMetrics {
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
pub resident_physical_size_gauge: UIntGauge,
pub read_num_fs_layers: Histogram,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub num_persistent_files_created: IntCounter,
@@ -939,6 +925,9 @@ impl TimelineMetrics {
) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
@@ -959,6 +948,9 @@ impl TimelineMetrics {
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
@@ -974,12 +966,16 @@ impl TimelineMetrics {
let evictions = EVICTIONS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let read_num_fs_layers = READ_NUM_FS_LAYERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
TimelineMetrics {
tenant_id,
timeline_id,
get_reconstruct_data_time_histo,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
@@ -988,6 +984,7 @@ impl TimelineMetrics {
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,
resident_physical_size_gauge,
current_logical_size_gauge,
num_persistent_files_created,
@@ -996,6 +993,7 @@ impl TimelineMetrics {
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
read_num_fs_layers,
}
}
}
@@ -1004,12 +1002,15 @@ impl Drop for TimelineMetrics {
fn drop(&mut self) {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
@@ -1021,6 +1022,9 @@ impl Drop for TimelineMetrics {
let _ =
STORAGE_TIME_COUNT_PER_TIMELINE.remove_label_values(&[op, tenant_id, timeline_id]);
}
for op in STORAGE_IO_TIME_OPERATIONS {
let _ = STORAGE_IO_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
}
for op in STORAGE_IO_SIZE_OPERATIONS {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, timeline_id]);
@@ -1035,7 +1039,9 @@ impl Drop for TimelineMetrics {
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
let tid = tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
// we leave the BROKEN_TENANTS_SET entry if any
for state in TenantState::VARIANTS {
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
}
}
use futures::Future;
@@ -1050,7 +1056,9 @@ pub struct RemoteTimelineClientMetrics {
tenant_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}
@@ -1060,13 +1068,14 @@ impl RemoteTimelineClientMetrics {
RemoteTimelineClientMetrics {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
remote_operation_time: Mutex::new(HashMap::default()),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
calls_started_hist: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
}
}
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
guard
@@ -1080,17 +1089,26 @@ impl RemoteTimelineClientMetrics {
})
.clone()
}
pub fn remote_operation_time(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
let mut guard = self.remote_operation_time.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str(), status);
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[key.0, key.1, key.2])
.unwrap()
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
key.2,
])
.unwrap()
});
metric.clone()
}
fn calls_unfinished_gauge(
@@ -1118,10 +1136,19 @@ impl RemoteTimelineClientMetrics {
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Histogram {
let mut guard = self.calls_started_hist.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
.get_metric_with_label_values(&[key.0, key.1])
.unwrap()
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
fn bytes_started_counter(
@@ -1301,10 +1328,15 @@ impl Drop for RemoteTimelineClientMetrics {
tenant_id,
timeline_id,
remote_physical_size_gauge,
remote_operation_time,
calls_unfinished_gauge,
calls_started_hist,
bytes_started_counter,
bytes_finished_counter,
} = self;
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
}
for ((a, b), _) in calls_unfinished_gauge.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE.remove_label_values(&[
tenant_id,
@@ -1313,6 +1345,14 @@ impl Drop for RemoteTimelineClientMetrics {
b,
]);
}
for ((a, b), _) in calls_started_hist.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
tenant_id,

View File

@@ -130,25 +130,11 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = BACKGROUND_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
// this will be wrong if any of the runtimes gets their worker threads configured to something
// else, but that has not been needed in a long time.
std::env::var("TOKIO_WORKER_THREADS")
.map(|s| s.parse::<usize>().unwrap())
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
});
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);
@@ -559,7 +545,7 @@ pub fn current_task_id() -> Option<PageserverTaskId> {
pub async fn shutdown_watcher() {
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_watcher() called in an unexpected task or thread");
.expect("shutdown_requested() called in an unexpected task or thread");
token.cancelled().await;
}

View File

@@ -20,7 +20,6 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::sync::OwnedMutexGuard;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
@@ -1336,11 +1335,7 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
@@ -1368,7 +1363,7 @@ impl Tenant {
for (timeline_id, timeline) in &timelines_to_compact {
timeline
.compact(cancel, ctx)
.compact(ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
}
@@ -2199,44 +2194,28 @@ impl Tenant {
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
let tid = tenant_id.to_string();
fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
([state.into()], matches!(state, TenantState::Broken { .. }))
}
let mut tuple = inspect_state(&rx.borrow_and_update());
let is_broken = tuple.1;
if !is_broken {
// the tenant might be ignored and reloaded, so first remove any previous set
// element. it most likely has already been scraped, as these are manual operations
// right now. most likely we will add it back very soon.
drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
}
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state])
.inc();
loop {
let labels = &tuple.0;
let current = TENANT_STATE_METRIC.with_label_values(labels);
current.inc();
match rx.changed().await {
Ok(()) => {
let new_state: &'static str = From::from(&*rx.borrow_and_update());
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state])
.dec();
TENANT_STATE_METRIC
.with_label_values(&[&tid, new_state])
.inc();
if rx.changed().await.is_err() {
// tenant has been dropped; decrement the counter because a tenant with that
// state is no longer in tenant map, but allow any broken set item to exist
// still.
current.dec();
break;
}
current.dec();
tuple = inspect_state(&rx.borrow_and_update());
let is_broken = tuple.1;
if is_broken {
// insert the tenant_id (back) into the set
crate::metrics::BROKEN_TENANTS_SET
.with_label_values(&[&tid])
.inc();
current_state = new_state;
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
return;
}
}
}
});
@@ -3231,7 +3210,7 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub async fn dump_layerfile_from_path(
pub fn dump_layerfile_from_path(
path: &Path,
verbose: bool,
ctx: &RequestContext,
@@ -3245,16 +3224,8 @@ pub async fn dump_layerfile_from_path(
file.read_exact_at(&mut header_buf, 0)?;
match u16::from_be_bytes(header_buf) {
crate::IMAGE_FILE_MAGIC => {
ImageLayer::new_for_path(path, file)?
.dump(verbose, ctx)
.await?
}
crate::DELTA_FILE_MAGIC => {
DeltaLayer::new_for_path(path, file)?
.dump(verbose, ctx)
.await?
}
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
magic => bail!("unrecognized magic identifier: {:?}", magic),
}
@@ -3470,7 +3441,6 @@ mod tests {
use hex_literal::hex;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
use tokio_util::sync::CancellationToken;
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
@@ -3992,7 +3962,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
@@ -4002,7 +3972,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
@@ -4012,7 +3982,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
@@ -4022,7 +3992,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
@@ -4091,7 +4061,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
tline.gc().await?;
}
@@ -4168,7 +4138,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
tline.gc().await?;
}
@@ -4256,7 +4226,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.compact(&ctx).await?;
tline.gc().await?;
}

View File

@@ -16,19 +16,29 @@ use crate::tenant::block_io::{BlockCursor, BlockReader};
use std::cmp::min;
use std::io::{Error, ErrorKind};
impl<R> BlockCursor<R>
where
R: BlockReader,
{
/// For reading
pub trait BlobCursor {
/// Read a blob into a new buffer.
pub fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf)?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub fn read_blob_into_buf(
fn read_blob_into_buf(
&mut self,
offset: u64,
dstbuf: &mut Vec<u8>,
) -> Result<(), std::io::Error>;
}
impl<R> BlobCursor for BlockCursor<R>
where
R: BlockReader,
{
fn read_blob_into_buf(
&mut self,
offset: u64,
dstbuf: &mut Vec<u8>,

View File

@@ -328,7 +328,7 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockCursor;
use rand::{seq::SliceRandom, thread_rng, RngCore};
use std::fs;

View File

@@ -626,17 +626,17 @@ impl LayerMap {
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!("Begin dump LayerMap");
println!("open_layer:");
if let Some(open_layer) = &self.open_layer {
open_layer.dump(verbose, ctx).await?;
open_layer.dump(verbose, ctx)?;
}
println!("frozen_layers:");
for frozen_layer in self.frozen_layers.iter() {
frozen_layer.dump(verbose, ctx).await?;
frozen_layer.dump(verbose, ctx)?;
}
println!("historic_layers:");

View File

@@ -768,6 +768,55 @@ pub async fn immediate_gc(
Ok(wait_task_done)
}
pub async fn immediate_compact(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<tokio::sync::oneshot::Receiver<anyhow::Result<()>>, ApiError> {
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Compaction,
Some(tenant_id),
Some(timeline_id),
&format!(
"timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
),
false,
async move {
let result = timeline
.compact(&ctx)
.instrument(info_span!("manual_compact", %tenant_id, %timeline_id))
.await;
match task_done.send(result) {
Ok(_) => (),
Err(result) => error!("failed to send compaction result: {result:?}"),
}
Ok(())
},
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
Ok(wait_task_done)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -338,8 +338,7 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -369,7 +368,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
async fn get_value_reconstruct_data(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
@@ -378,7 +377,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
) -> Result<ValueReconstructResult>;
/// Dump summary of the contents of the layer to stdout
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
}
/// Returned by [`PersistentLayer::iter`]

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -223,10 +223,9 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
self.desc.tenant_id,
@@ -301,7 +300,7 @@ impl Layer for DeltaLayer {
Ok(())
}
async fn get_value_reconstruct_data(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,

View File

@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -155,10 +155,9 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.desc.tenant_id,
@@ -190,7 +189,7 @@ impl Layer for ImageLayer {
}
/// Look up given page in the file
async fn get_value_reconstruct_data(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,

View File

@@ -7,7 +7,7 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
@@ -110,7 +110,6 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
@@ -133,7 +132,7 @@ impl Layer for InMemoryLayer {
}
/// debugging function to print out the contents of the layer
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
let end_str = inner
@@ -184,7 +183,7 @@ impl Layer for InMemoryLayer {
}
/// Look up given value in the layer.
async fn get_value_reconstruct_data(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,

View File

@@ -65,9 +65,8 @@ impl std::fmt::Debug for RemoteLayer {
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
async fn get_value_reconstruct_data(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
@@ -78,7 +77,7 @@ impl Layer for RemoteLayer {
}
/// debugging function to print out the contents of the layer
async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
self.desc.tenant_id,

View File

@@ -111,7 +111,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
Duration::from_secs(10)
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
if let Err(e) = tenant.compaction_iteration(&ctx).await {
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {

View File

@@ -334,7 +334,7 @@ pub struct GcInfo {
#[derive(thiserror::Error)]
pub enum PageReconstructError {
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(#[from] anyhow::Error), // source and Display delegate to anyhow::Error
/// The operation would require downloading a layer that is missing locally.
NeedsDownload(TenantTimelineId, LayerFileName),
@@ -475,7 +475,7 @@ impl Timeline {
img: cached_page_img,
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();
@@ -555,7 +555,7 @@ impl Timeline {
"wait_lsn cannot be called in WAL receiver"
);
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
let _timer = self.metrics.wait_lsn_time_histo.start_timer();
match self
.last_record_lsn
@@ -611,46 +611,9 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = tokio::select! {
permit = CONCURRENT_COMPACTIONS.acquire() => {
permit
},
_ = cancel.cancelled() => {
return Ok(());
}
};
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timeline was just created
@@ -708,9 +671,11 @@ impl Timeline {
let mut failed = 0;
let mut cancelled = pin!(task_mgr::shutdown_watcher());
loop {
tokio::select! {
_ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
_ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
res = downloads.next() => {
match res {
Some(Ok(())) => {},
@@ -925,7 +890,7 @@ impl Timeline {
new_state,
TimelineState::Stopping | TimelineState::Broken { .. }
) {
// drop the completion guard, if any; it might be holding off the completion
// drop the copmletion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
.lock()
@@ -2287,9 +2252,8 @@ impl Timeline {
let mut timeline_owned;
let mut timeline = self;
let mut read_count = scopeguard::guard(0, |cnt| {
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
});
let mut read_count =
scopeguard::guard(0, |cnt| self.metrics.read_num_fs_layers.observe(cnt as f64));
// For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key.
@@ -2423,15 +2387,12 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
@@ -2453,15 +2414,12 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
@@ -2492,15 +2450,12 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
@@ -3497,7 +3452,7 @@ impl Timeline {
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
@@ -3537,7 +3492,11 @@ impl Timeline {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
(a_key, a_lsn) < (b_key, b_lsn)
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
@@ -3555,7 +3514,11 @@ impl Timeline {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
(a_key, a_lsn) < (b_key, b_lsn)
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
})
},
)?;

View File

@@ -149,10 +149,12 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
// We do not have information about tenant_id/timeline_id of evicted file.
// It is possible to store path together with file or use filepath crate,
// but as far as close() is not expected to be fast, it is not so critical to gather
// precise per-tenant statistic here.
STORAGE_IO_TIME
.with_label_values(&["close-by-replace"])
.with_label_values(&["close", "-", "-"])
.observe_closure_duration(|| drop(old_file));
}
@@ -206,7 +208,7 @@ impl VirtualFile {
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.with_label_values(&["open", &tenant_id, &timeline_id])
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
@@ -269,7 +271,7 @@ impl VirtualFile {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
.observe_closure_duration(|| func(file)));
}
}
@@ -296,12 +298,12 @@ impl VirtualFile {
// Open the physical file
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.with_label_values(&["open", &self.tenant_id, &self.timeline_id])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME
.with_label_values(&[op])
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
@@ -331,11 +333,13 @@ impl Drop for VirtualFile {
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
// Unlike files evicted by replacement algorithm, here
// we group close time by tenant_id/timeline_id.
// At allows to compare number/time of "normal" file closes
// with file eviction.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
.with_label_values(&["close", &self.tenant_id, &self.timeline_id])
.observe_closure_duration(|| slot_guard.file.take());
}
}
}

View File

@@ -48,14 +48,6 @@ impl ClientCredentials<'_> {
}
impl<'a> ClientCredentials<'a> {
#[cfg(test)]
pub fn new_noop() -> Self {
ClientCredentials {
user: "",
project: None,
}
}
pub fn parse(
params: &'a StartupMessageParams,
sni: Option<&str>,

View File

@@ -1,9 +1,5 @@
//! A group of high-level tests for connection establishing logic and auth.
use std::borrow::Cow;
use super::*;
use crate::auth::ClientCredentials;
use crate::console::{CachedNodeInfo, NodeInfo};
use crate::{auth, sasl, scram};
use async_trait::async_trait;
use rstest::rstest;
@@ -308,148 +304,3 @@ fn connect_compute_total_wait() {
assert!(total_wait < tokio::time::Duration::from_secs(12));
assert!(total_wait > tokio::time::Duration::from_secs(10));
}
#[derive(Clone, Copy)]
enum ConnectAction {
Connect,
Retry,
Fail,
}
struct TestConnectMechanism {
counter: Arc<std::sync::Mutex<usize>>,
sequence: Vec<ConnectAction>,
}
impl TestConnectMechanism {
fn new(sequence: Vec<ConnectAction>) -> Self {
Self {
counter: Arc::new(std::sync::Mutex::new(0)),
sequence,
}
}
}
#[derive(Debug)]
struct TestConnection;
#[derive(Debug)]
struct TestConnectError {
retryable: bool,
}
impl std::fmt::Display for TestConnectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for TestConnectError {}
impl ShouldRetry for TestConnectError {
fn could_retry(&self) -> bool {
self.retryable
}
}
#[async_trait]
impl ConnectMechanism for TestConnectMechanism {
type Connection = TestConnection;
type ConnectError = TestConnectError;
type Error = anyhow::Error;
async fn connect_once(
&self,
_node_info: &console::CachedNodeInfo,
_timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
*counter += 1;
match action {
ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
}
}
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
}
fn helper_create_connect_info() -> (
CachedNodeInfo,
console::ConsoleReqExtra<'static>,
auth::BackendType<'static, ClientCredentials<'static>>,
) {
let node = NodeInfo {
config: compute::ConnCfg::new(),
aux: Default::default(),
allow_self_signed_compute: false,
};
let cache = CachedNodeInfo::new_uncached(node);
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
};
let url = "https://TEST_URL".parse().unwrap();
let api = console::provider::mock::Api::new(url);
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
(cache, extra, creds)
}
#[tokio::test]
async fn connect_to_compute_success() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Connect]);
let (cache, extra, creds) = helper_create_connect_info();
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
}
#[tokio::test]
async fn connect_to_compute_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info();
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
}
/// Test that we don't retry if the error is not retryable.
#[tokio::test]
async fn connect_to_compute_non_retry_1() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
let (cache, extra, creds) = helper_create_connect_info();
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap_err();
}
/// Even for non-retryable errors, we should retry at least once.
#[tokio::test]
async fn connect_to_compute_non_retry_2() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info();
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
}
/// Retry for at most `NUM_RETRIES_CONNECT` times.
#[tokio::test]
async fn connect_to_compute_non_retry_3() {
assert_eq!(NUM_RETRIES_CONNECT, 10);
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
/* the 11th time */ Retry,
]);
let (cache, extra, creds) = helper_create_connect_info();
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap_err();
}

View File

@@ -40,13 +40,10 @@ def parse_metrics(text: str, name: str = "") -> Metrics:
return metrics
def histogram(prefix_without_trailing_underscore: str) -> List[str]:
assert not prefix_without_trailing_underscore.endswith("_")
return [f"{prefix_without_trailing_underscore}_{x}" for x in ["bucket", "count", "sum"]]
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
"pageserver_remote_timeline_client_calls_unfinished",
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
"pageserver_remote_physical_size",
"pageserver_remote_timeline_client_bytes_started_total",
"pageserver_remote_timeline_client_bytes_finished_total",
@@ -70,29 +67,34 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_read_num_fs_layers"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),
*histogram("pageserver_remote_timeline_client_calls_started"),
*histogram("pageserver_io_operations_seconds"),
"pageserver_tenant_states_count",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_current_logical_size",
"pageserver_resident_physical_size",
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
"pageserver_getpage_get_reconstruct_data_seconds_count",
"pageserver_getpage_get_reconstruct_data_seconds_sum",
"pageserver_io_operations_bytes_total",
"pageserver_io_operations_seconds_bucket",
"pageserver_io_operations_seconds_count",
"pageserver_io_operations_seconds_sum",
"pageserver_last_record_lsn",
"pageserver_read_num_fs_layers_bucket",
"pageserver_read_num_fs_layers_count",
"pageserver_read_num_fs_layers_sum",
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
"pageserver_smgr_query_seconds_sum",
"pageserver_storage_operations_seconds_count_total",
"pageserver_storage_operations_seconds_sum_total",
"pageserver_wait_lsn_seconds_bucket",
"pageserver_wait_lsn_seconds_count",
"pageserver_wait_lsn_seconds_sum",
"pageserver_created_persistent_files_total",
"pageserver_written_persistent_bytes_total",
"pageserver_tenant_states_count",
"pageserver_evictions_total",
"pageserver_evictions_with_low_residence_duration_total",
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# pageserver_broken_tenants_count is a leaked "metric" which is "cleared" on restart or reload
)

View File

@@ -136,6 +136,8 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},

View File

@@ -140,6 +140,8 @@ def test_metric_collection(
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},

View File

@@ -8,10 +8,6 @@ from fixtures.utils import query_scalar
# Now this test is very minimalistic -
# it only checks next_multixact_id field in restored pg_control,
# since we don't have functions to check multixact internals.
# We do check that the datadir contents exported from the
# pageserver match what the running PostgreSQL produced. This
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
@@ -22,8 +18,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
cur = endpoint.connect().cursor()
cur.execute(
"""
CREATE TABLE t1(i int primary key, n_updated int);
INSERT INTO t1 select g, 0 from generate_series(1, 50) g;
CREATE TABLE t1(i int primary key);
INSERT INTO t1 select * from generate_series(1, 100);
"""
)
@@ -33,7 +29,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
# Lock entries using parallel connections in a round-robin fashion.
nclients = 20
update_every = 97
connections = []
for _ in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
@@ -41,20 +36,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
connections.append(conn)
# On each iteration, we commit the previous transaction on a connection,
# and issue another select. Each SELECT generates a new multixact that
# and issue antoher select. Each SELECT generates a new multixact that
# includes the new XID, and the XIDs of all the other parallel transactions.
# This generates enough traffic on both multixact offsets and members SLRUs
# to cross page boundaries.
for i in range(20000):
for i in range(5000):
conn = connections[i % nclients]
conn.commit()
# Perform some non-key UPDATEs too, to exercise different multixact
# member statuses.
if i % update_every == 0:
conn.cursor().execute(f"update t1 set n_updated = n_updated + 1 where i = {i % 50}")
else:
conn.cursor().execute("select * from t1 for key share")
conn.cursor().execute("select * from t1 for key share")
# We have multixacts now. We can close the connections.
for c in connections:

View File

@@ -27,16 +27,15 @@ from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
def get_num_downloaded_layers(client: PageserverHttpClient):
"""
This assumes that the pageserver only has a single tenant.
"""
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
value = client.get_metric_value(
"pageserver_remote_operation_seconds_count",
{
"file_kind": "layer",
"op_kind": "download",
"status": "success",
"tenant_id": tenant_id,
"timeline_id": timeline_id,
},
)
if value is None:
@@ -58,8 +57,7 @@ def test_ondemand_download_large_rel(
test_name="test_ondemand_download_large_rel",
)
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable background GC
@@ -131,7 +129,7 @@ def test_ondemand_download_large_rel(
# safekeepers, that have now been shut down.
endpoint = env.endpoints.create_start("main", lsn=current_lsn)
before_downloads = get_num_downloaded_layers(client)
before_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
assert before_downloads != 0, "basebackup should on-demand non-zero layers"
# Probe in the middle of the table. There's a high chance that the beginning
@@ -142,7 +140,7 @@ def test_ondemand_download_large_rel(
with endpoint.cursor() as cur:
assert query_scalar(cur, "select count(*) from tbl where id = 500000") == 1
after_downloads = get_num_downloaded_layers(client)
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
log.info(f"layers downloaded before {before_downloads} and after {after_downloads}")
assert after_downloads > before_downloads
@@ -161,11 +159,13 @@ def test_ondemand_download_timetravel(
test_name="test_ondemand_download_timetravel",
)
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
env = neon_env_builder.init_start(
initial_tenant_conf={
# Override defaults, to create more layers
tenant, _ = env.neon_cli.create_tenant(
conf={
# Disable background GC & compaction
# We don't want GC, that would break the assertion about num downloads.
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
@@ -178,7 +178,7 @@ def test_ondemand_download_timetravel(
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
}
)
pageserver_http = env.pageserver.http_client()
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")
@@ -283,7 +283,7 @@ def test_ondemand_download_timetravel(
== table_len
)
after_downloads = get_num_downloaded_layers(client)
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
num_layers_downloaded.append(after_downloads)
log.info(f"num_layers_downloaded[-1]={num_layers_downloaded[-1]}")
@@ -324,8 +324,11 @@ def test_download_remote_layers_api(
)
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start(
initial_tenant_conf={
env = neon_env_builder.init_start()
# Override defaults, to create more layers
tenant, _ = env.neon_cli.create_tenant(
conf={
# Disable background GC & compaction
# We don't want GC, that would break the assertion about num downloads.
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
@@ -338,6 +341,7 @@ def test_download_remote_layers_api(
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
}
)
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")
@@ -485,6 +489,8 @@ def test_compaction_downloads_on_demand_without_image_creation(
test_name="test_compaction_downloads_on_demand_without_image_creation",
)
env = neon_env_builder.init_start()
conf = {
# Disable background GC & compaction
"gc_period": "0s",
@@ -500,8 +506,6 @@ def test_compaction_downloads_on_demand_without_image_creation(
# pitr_interval and gc_horizon are not interesting because we dont run gc
}
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
def downloaded_bytes_and_count(pageserver_http: PageserverHttpClient) -> Tuple[int, int]:
m = pageserver_http.get_metrics()
# these are global counters
@@ -513,12 +517,11 @@ def test_compaction_downloads_on_demand_without_image_creation(
assert count < 2**53 and count.is_integer(), "count should still be safe integer-in-f64"
return (int(total_bytes), int(count))
# Override defaults, to create more layers
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
env.initial_tenant = tenant_id
pageserver_http = env.pageserver.http_client()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
with env.endpoints.create_start("main") as endpoint:
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
@@ -574,6 +577,8 @@ def test_compaction_downloads_on_demand_with_image_creation(
test_name="test_compaction_downloads_on_demand",
)
env = neon_env_builder.init_start()
conf = {
# Disable background GC & compaction
"gc_period": "0s",
@@ -588,11 +593,9 @@ def test_compaction_downloads_on_demand_with_image_creation(
# pitr_interval and gc_horizon are not interesting because we dont run gc
}
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
# Override defaults, to create more layers
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
env.initial_tenant = tenant_id
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
@@ -661,6 +664,10 @@ def test_compaction_downloads_on_demand_with_image_creation(
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_ondemand_download_failure_to_replace(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
@@ -684,10 +691,9 @@ def test_ondemand_download_failure_to_replace(
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
tenant_id, timeline_id = env.neon_cli.create_tenant()
env.initial_tenant = tenant_id
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pageserver_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
@@ -718,7 +724,3 @@ def test_ondemand_download_failure_to_replace(
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
# if the above returned, then we didn't have a livelock, and all is well
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))

View File

@@ -378,10 +378,12 @@ def test_remote_timeline_client_calls_started_metric(
test_name="test_remote_timeline_client_metrics",
)
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
env = neon_env_builder.init_start(
initial_tenant_conf={
env = neon_env_builder.init_start()
# create tenant with config that will determinstically allow
# compaction and gc
tenant_id, timeline_id = env.neon_cli.create_tenant(
conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
@@ -396,10 +398,6 @@ def test_remote_timeline_client_calls_started_metric(
}
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -421,7 +419,6 @@ def test_remote_timeline_client_calls_started_metric(
"VACUUM foo",
]
)
assert timeline_id is not None
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
calls_started: Dict[Tuple[str, str], List[int]] = {
@@ -431,14 +428,13 @@ def test_remote_timeline_client_calls_started_metric(
}
def fetch_calls_started():
assert timeline_id is not None
for (file_kind, op_kind), observations in calls_started.items():
val = client.get_metric_value(
name="pageserver_remote_timeline_client_calls_started_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
val = client.get_remote_timeline_client_metric(
"pageserver_remote_timeline_client_calls_started_count",
tenant_id,
timeline_id,
file_kind,
op_kind,
)
assert val is not None, f"expecting metric to be present: {file_kind} {op_kind}"
val = int(val)
@@ -522,8 +518,12 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
test_name="test_timeline_deletion_with_files_stuck_in_upload_queue",
)
env = neon_env_builder.init_start(
initial_tenant_conf={
env = neon_env_builder.init_start()
# create tenant with config that will determinstically allow
# compaction and gc
tenant_id, timeline_id = env.neon_cli.create_tenant(
conf={
# small checkpointing and compaction targets to ensure we generate many operations
"checkpoint_distance": f"{64 * 1024}",
"compaction_threshold": "1",
@@ -535,10 +535,6 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
"pitr_interval": "0s",
}
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
timeline_path = env.timeline_dir(tenant_id, timeline_id)
client = env.pageserver.http_client()
@@ -791,8 +787,12 @@ def test_compaction_delete_before_upload(
test_name="test_compaction_delete_before_upload",
)
env = neon_env_builder.init_start(
initial_tenant_conf={
env = neon_env_builder.init_start()
# create tenant with config that will determinstically allow
# compaction and disables gc
tenant_id, timeline_id = env.neon_cli.create_tenant(
conf={
# Set a small compaction threshold
"compaction_threshold": "3",
# Disable GC
@@ -802,10 +802,6 @@ def test_compaction_delete_before_upload(
}
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:

View File

@@ -2,7 +2,6 @@ import asyncio
import random
import time
from threading import Thread
from typing import List, Optional
import asyncpg
import pytest
@@ -22,7 +21,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
from prometheus_client.samples import Sample
def do_gc_target(
@@ -856,89 +854,3 @@ def ensure_test_data(data_id: int, data: str, endpoint: Endpoint):
assert (
query_scalar(cur, f"SELECT secret FROM test WHERE id = {data_id};") == data
), "Should have timeline data back"
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_metrics_while_ignoring_broken_tenant_and_reloading(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_metrics_while_ignoring_broken_tenant_and_reloading",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
env.pageserver.allowed_errors.append(
r".* Changing Active tenant to Broken state, reason: broken from test"
)
def only_int(samples: List[Sample]) -> Optional[int]:
if len(samples) == 1:
return int(samples[0].value)
assert len(samples) == 0
return None
wait_until_tenant_state(client, env.initial_tenant, "Active", 10, 0.5)
client.tenant_break(env.initial_tenant)
found_broken = False
active, broken, broken_set = ([], [], [])
for _ in range(10):
m = client.get_metrics()
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
broken_set = m.query_all(
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
)
found_broken = only_int(active) == 0 and only_int(broken) == 1 and only_int(broken_set) == 1
if found_broken:
break
log.info(f"active: {active}, broken: {broken}, broken_set: {broken_set}")
time.sleep(0.5)
assert (
found_broken
), f"tenant shows up as broken; active={active}, broken={broken}, broken_set={broken_set}"
client.tenant_ignore(env.initial_tenant)
found_broken = False
broken, broken_set = ([], [])
for _ in range(10):
m = client.get_metrics()
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
broken_set = m.query_all(
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
)
found_broken = only_int(broken) == 0 and only_int(broken_set) == 1
if found_broken:
break
time.sleep(0.5)
assert (
found_broken
), f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}"
client.tenant_load(env.initial_tenant)
found_active = False
active, broken_set = ([], [])
for _ in range(10):
m = client.get_metrics()
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
broken_set = m.query_all(
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
)
found_active = only_int(active) == 1 and len(broken_set) == 0
if found_active:
break
time.sleep(0.5)
assert (
found_active
), f"reloaded tenant should be active, and broken tenant set item removed: active={active}, broken_set={broken_set}"

View File

@@ -213,9 +213,6 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
# Test (a subset of) pageserver global metrics
for metric in PAGESERVER_GLOBAL_METRICS:
if metric.startswith("pageserver_remote"):
continue
ps_samples = ps_metrics.query_all(metric, {})
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
for sample in ps_samples:
@@ -383,8 +380,10 @@ def test_pageserver_with_empty_tenants(
ps_metrics = client.get_metrics()
broken_tenants_metric_filter = {
"tenant_id": str(tenant_without_timelines_dir),
"state": "Broken",
}
active_tenants_metric_filter = {
"tenant_id": str(tenant_with_empty_timelines),
"state": "Active",
}
@@ -400,7 +399,7 @@ def test_pageserver_with_empty_tenants(
tenant_broken_count = int(
ps_metrics.query_one(
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
"pageserver_tenant_states_count", filter=broken_tenants_metric_filter
).value
)