mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 14:10:37 +00:00
Compare commits
1 Commits
test_pgvec
...
no-sync-sa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f7c2b9826 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -2506,7 +2506,6 @@ dependencies = [
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"svg_fmt",
|
||||
"tokio",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -2545,7 +2544,6 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pin-project-lite",
|
||||
@@ -4869,7 +4867,6 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -199,8 +199,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# Use custom branch with HNSW index support
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/heads/hnsw.tar.gz -O pgvector.tar.gz && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
|
||||
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
|
||||
@@ -223,8 +223,9 @@ fn main() -> Result<()> {
|
||||
drop(state);
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(&compute);
|
||||
let _configurator_handle = launch_configurator(&compute);
|
||||
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
|
||||
@@ -372,9 +372,13 @@ impl ComputeNode {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
let lsn = if let Some(lsn) = self.check_safekeepers_synced() {
|
||||
lsn
|
||||
} else {
|
||||
self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
@@ -41,14 +42,13 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
thread::Builder::new()
|
||||
Ok(thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})
|
||||
.expect("cannot launch configurator thread")
|
||||
})?)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, info};
|
||||
@@ -104,11 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
}
|
||||
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
thread::Builder::new()
|
||||
Ok(thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&state))
|
||||
.expect("cannot launch compute monitor thread")
|
||||
.spawn(move || watch_compute_activity(&state))?)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ use once_cell::sync::Lazy;
|
||||
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::Error;
|
||||
pub use prometheus::{core, default_registry, proto};
|
||||
pub use prometheus::{exponential_buckets, linear_buckets};
|
||||
pub use prometheus::{register_counter_vec, Counter, CounterVec};
|
||||
|
||||
@@ -57,9 +57,9 @@ pub fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
|
||||
// Multixact utils
|
||||
|
||||
pub fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
|
||||
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32)
|
||||
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE as u32
|
||||
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE as u32) as usize
|
||||
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
|
||||
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
|
||||
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
|
||||
}
|
||||
|
||||
pub fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
|
||||
@@ -81,41 +81,3 @@ fn mx_offset_to_member_page(xid: u32) -> u32 {
|
||||
pub fn mx_offset_to_member_segment(xid: u32) -> i32 {
|
||||
(mx_offset_to_member_page(xid) / pg_constants::SLRU_PAGES_PER_SEGMENT) as i32
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_multixid_calc() {
|
||||
// Check that the mx_offset_* functions produce the same values as the
|
||||
// corresponding PostgreSQL C macros (MXOffsetTo*). These test values
|
||||
// were generated by calling the PostgreSQL macros with a little C
|
||||
// program.
|
||||
assert_eq!(mx_offset_to_member_segment(0), 0);
|
||||
assert_eq!(mx_offset_to_member_page(0), 0);
|
||||
assert_eq!(mx_offset_to_flags_offset(0), 0);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(0), 0);
|
||||
assert_eq!(mx_offset_to_member_offset(0), 4);
|
||||
assert_eq!(mx_offset_to_member_segment(1), 0);
|
||||
assert_eq!(mx_offset_to_member_page(1), 0);
|
||||
assert_eq!(mx_offset_to_flags_offset(1), 0);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(1), 8);
|
||||
assert_eq!(mx_offset_to_member_offset(1), 8);
|
||||
assert_eq!(mx_offset_to_member_segment(123456789), 2358);
|
||||
assert_eq!(mx_offset_to_member_page(123456789), 75462);
|
||||
assert_eq!(mx_offset_to_flags_offset(123456789), 4780);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(123456789), 8);
|
||||
assert_eq!(mx_offset_to_member_offset(123456789), 4788);
|
||||
assert_eq!(mx_offset_to_member_segment(u32::MAX - 1), 82040);
|
||||
assert_eq!(mx_offset_to_member_page(u32::MAX - 1), 2625285);
|
||||
assert_eq!(mx_offset_to_flags_offset(u32::MAX - 1), 5160);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX - 1), 16);
|
||||
assert_eq!(mx_offset_to_member_offset(u32::MAX - 1), 5172);
|
||||
assert_eq!(mx_offset_to_member_segment(u32::MAX), 82040);
|
||||
assert_eq!(mx_offset_to_member_page(u32::MAX), 2625285);
|
||||
assert_eq!(mx_offset_to_flags_offset(u32::MAX), 5160);
|
||||
assert_eq!(mx_offset_to_flags_bitshift(u32::MAX), 24);
|
||||
assert_eq!(mx_offset_to_member_offset(u32::MAX), 5176);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,10 +42,6 @@ workspace_hack.workspace = true
|
||||
|
||||
const_format.workspace = true
|
||||
|
||||
# to use tokio channels as streams, this is faster to compile than async_stream
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -9,6 +9,7 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::{self, debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
@@ -147,140 +148,26 @@ impl Drop for RequestCancelled {
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
|
||||
struct ChannelWriter {
|
||||
buffer: BytesMut,
|
||||
tx: mpsc::Sender<std::io::Result<Bytes>>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl ChannelWriter {
|
||||
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
|
||||
assert_ne!(buf_len, 0);
|
||||
ChannelWriter {
|
||||
// split about half off the buffer from the start, because we flush depending on
|
||||
// capacity. first flush will come sooner than without this, but now resizes will
|
||||
// have better chance of picking up the "other" half. not guaranteed of course.
|
||||
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
|
||||
tx,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn flush0(&mut self) -> std::io::Result<usize> {
|
||||
let n = self.buffer.len();
|
||||
if n == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
tracing::trace!(n, "flushing");
|
||||
let ready = self.buffer.split().freeze();
|
||||
|
||||
// not ideal to call from blocking code to block_on, but we are sure that this
|
||||
// operation does not spawn_blocking other tasks
|
||||
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
|
||||
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
|
||||
|
||||
// throttle sending to allow reuse of our buffer in `write`.
|
||||
self.tx.reserve().await.map_err(|_| ())?;
|
||||
|
||||
// now the response task has picked up the buffer and hopefully started
|
||||
// sending it to the client.
|
||||
Ok(())
|
||||
});
|
||||
if res.is_err() {
|
||||
return Err(std::io::ErrorKind::BrokenPipe.into());
|
||||
}
|
||||
self.written += n;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn flushed_bytes(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Write for ChannelWriter {
|
||||
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
||||
let remaining = self.buffer.capacity() - self.buffer.len();
|
||||
|
||||
let out_of_space = remaining < buf.len();
|
||||
|
||||
let original_len = buf.len();
|
||||
|
||||
if out_of_space {
|
||||
let can_still_fit = buf.len() - remaining;
|
||||
self.buffer.extend_from_slice(&buf[..can_still_fit]);
|
||||
buf = &buf[can_still_fit..];
|
||||
self.flush0()?;
|
||||
}
|
||||
|
||||
// assume that this will often under normal operation just move the pointer back to the
|
||||
// beginning of allocation, because previous split off parts are already sent and
|
||||
// dropped.
|
||||
self.buffer.extend_from_slice(buf);
|
||||
Ok(original_len)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.flush0().map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
let body = Body::wrap_stream(ReceiverStream::new(rx));
|
||||
|
||||
let mut writer = ChannelWriter::new(128 * 1024, tx);
|
||||
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
let metrics = tokio::task::spawn_blocking(move || {
|
||||
// Currently we take a lot of mutexes while collecting metrics, so it's
|
||||
// better to spawn a blocking task to avoid blocking the event loop.
|
||||
metrics::gather()
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(body)
|
||||
.body(Body::from(buffer))
|
||||
.unwrap();
|
||||
|
||||
let span = info_span!("blocking");
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _span = span.entered();
|
||||
let metrics = metrics::gather();
|
||||
let res = encoder
|
||||
.encode(&metrics, &mut writer)
|
||||
.and_then(|_| writer.flush().map_err(|e| e.into()));
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
bytes = writer.flushed_bytes(),
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"responded /metrics"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to write out /metrics response: {e:#}");
|
||||
// semantics of this error are quite... unclear. we want to error the stream out to
|
||||
// abort the response to somehow notify the client that we failed.
|
||||
//
|
||||
// though, most likely the reason for failure is that the receiver is already gone.
|
||||
drop(
|
||||
writer
|
||||
.tx
|
||||
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
||||
@@ -35,8 +35,6 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus = { version = "1.15" }
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -13,7 +13,6 @@ clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
postgres_ffi.workspace = true
|
||||
tokio.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -95,7 +95,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
}
|
||||
|
||||
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
|
||||
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
@@ -129,7 +129,7 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
Ok(holes)
|
||||
}
|
||||
|
||||
pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
@@ -160,7 +160,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
if layer_file.is_delta {
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes).await?;
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes)?;
|
||||
n_deltas += 1;
|
||||
}
|
||||
layers.push(layer_file);
|
||||
|
||||
@@ -43,7 +43,8 @@ pub(crate) enum LayerCmd {
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::blob_io::BlobCursor;
|
||||
use pageserver::tenant::block_io::BlockReader;
|
||||
|
||||
let path = path.as_ref();
|
||||
@@ -77,7 +78,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LayerCmd::List { path } => {
|
||||
for tenant in fs::read_dir(path.join("tenants"))? {
|
||||
@@ -152,7 +153,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path()).await?;
|
||||
read_delta_file(layer.path())?;
|
||||
} else {
|
||||
anyhow::bail!("not supported yet :(");
|
||||
}
|
||||
|
||||
@@ -72,13 +72,12 @@ struct AnalyzeLayerMapCmd {
|
||||
max_holes: Option<usize>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let cli = CliOpts::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Layer(cmd) => {
|
||||
layers::main(&cmd).await?;
|
||||
layers::main(&cmd)?;
|
||||
}
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
@@ -87,7 +86,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
Commands::AnalyzeLayerMap(cmd) => {
|
||||
layer_map_analyzer::main(&cmd).await?;
|
||||
layer_map_analyzer::main(&cmd)?;
|
||||
}
|
||||
Commands::PrintLayerFile(cmd) => {
|
||||
if let Err(e) = read_pg_control_file(&cmd.path) {
|
||||
@@ -95,7 +94,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&cmd.path).await?;
|
||||
print_layerfile(&cmd.path)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -114,12 +113,12 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(
|
||||
|
||||
@@ -994,29 +994,31 @@ async fn timeline_gc_handler(
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
|
||||
.await
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let result_receiver = mgr::immediate_compact(tenant_id, timeline_id, &ctx)
|
||||
.await
|
||||
.context("spawn compaction task")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let result: anyhow::Result<()> = result_receiver
|
||||
.await
|
||||
.context("receive compaction result")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
result.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// Run checkpoint immediately on given timeline.
|
||||
async fn timeline_checkpoint_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -1029,13 +1031,13 @@ async fn timeline_checkpoint_handler(
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.compact(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
|
||||
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use metrics::{
|
||||
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::TenantState;
|
||||
use strum::VariantNames;
|
||||
use strum_macros::{EnumVariantNames, IntoStaticStr};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -83,10 +84,11 @@ pub static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_read_num_fs_layers",
|
||||
"Number of persistent layers accessed for processing a read request, including those in the cache",
|
||||
&["tenant_id", "timeline_id"],
|
||||
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -110,10 +112,11 @@ pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_getpage_get_reconstruct_data_seconds",
|
||||
"Time spent in get_reconstruct_value_data",
|
||||
&["tenant_id", "timeline_id"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -243,10 +246,11 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
|
||||
},
|
||||
});
|
||||
|
||||
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
"Time spent waiting for WAL to arrive",
|
||||
&["tenant_id", "timeline_id"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -305,24 +309,11 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define current logical size metric")
|
||||
});
|
||||
|
||||
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_states_count",
|
||||
"Count of tenants per state",
|
||||
&["state"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_states_count metric")
|
||||
});
|
||||
|
||||
/// A set of broken tenants.
|
||||
///
|
||||
/// These are expected to be so rare that a set is fine. Set as in a new timeseries per each broken
|
||||
/// tenant.
|
||||
pub(crate) static BROKEN_TENANTS_SET: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_broken_tenants_count",
|
||||
"Set of broken tenants",
|
||||
&["tenant_id"]
|
||||
&["tenant_id", "state"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_states_count metric")
|
||||
});
|
||||
@@ -508,31 +499,23 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
30.000, // 30000 ms
|
||||
];
|
||||
|
||||
/// Tracks time taken by fs operations near VirtualFile.
|
||||
///
|
||||
/// Operations:
|
||||
/// - open ([`std::fs::OpenOptions::open`])
|
||||
/// - close (dropping [`std::fs::File`])
|
||||
/// - close-by-replace (close by replacement algorithm)
|
||||
/// - read (`read_at`)
|
||||
/// - write (`write_at`)
|
||||
/// - seek (modify internal position or file length query)
|
||||
/// - fsync ([`std::fs::File::sync_all`])
|
||||
/// - metadata ([`std::fs::File::metadata`])
|
||||
pub(crate) static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
|
||||
"open", "close", "read", "write", "seek", "fsync", "gc", "metadata",
|
||||
];
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_io_operations_seconds",
|
||||
"Time spent in IO operations",
|
||||
&["operation"],
|
||||
&["operation", "tenant_id", "timeline_id"],
|
||||
STORAGE_IO_TIME_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
pub static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
@@ -622,7 +605,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
at a given instant. It gives you a better idea of the queue depth \
|
||||
than plotting the gauge directly, since operations may complete faster \
|
||||
than the sampling interval.",
|
||||
&["file_kind", "op_kind"],
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
// The calls_unfinished gauge is an integer gauge, hence we have integer buckets.
|
||||
vec![0.0, 1.0, 2.0, 4.0, 6.0, 8.0, 10.0, 15.0, 20.0, 40.0, 60.0, 80.0, 100.0, 500.0],
|
||||
)
|
||||
@@ -679,13 +662,13 @@ impl RemoteOpFileKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_operation_seconds",
|
||||
"Time spent on remote storage operations. \
|
||||
Grouped by tenant, timeline, operation_kind and status. \
|
||||
Does not account for time spent waiting in remote timeline client's queues.",
|
||||
&["file_kind", "op_kind", "status"]
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -914,6 +897,7 @@ impl StorageTimeMetrics {
|
||||
pub struct TimelineMetrics {
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
pub get_reconstruct_data_time_histo: Histogram,
|
||||
pub flush_time_histo: StorageTimeMetrics,
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
@@ -922,7 +906,9 @@ pub struct TimelineMetrics {
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
pub wait_lsn_time_histo: Histogram,
|
||||
pub resident_physical_size_gauge: UIntGauge,
|
||||
pub read_num_fs_layers: Histogram,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
@@ -939,6 +925,9 @@ impl TimelineMetrics {
|
||||
) -> Self {
|
||||
let tenant_id = tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let flush_time_histo =
|
||||
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
|
||||
let compact_time_histo =
|
||||
@@ -959,6 +948,9 @@ impl TimelineMetrics {
|
||||
let last_record_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let wait_lsn_time_histo = WAIT_LSN_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -974,12 +966,16 @@ impl TimelineMetrics {
|
||||
let evictions = EVICTIONS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let read_num_fs_layers = READ_NUM_FS_LAYERS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
get_reconstruct_data_time_histo,
|
||||
flush_time_histo,
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
@@ -988,6 +984,7 @@ impl TimelineMetrics {
|
||||
garbage_collect_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_gauge,
|
||||
wait_lsn_time_histo,
|
||||
resident_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
num_persistent_files_created,
|
||||
@@ -996,6 +993,7 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
read_num_fs_layers,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1004,12 +1002,15 @@ impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -1021,6 +1022,9 @@ impl Drop for TimelineMetrics {
|
||||
let _ =
|
||||
STORAGE_TIME_COUNT_PER_TIMELINE.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
for op in STORAGE_IO_TIME_OPERATIONS {
|
||||
let _ = STORAGE_IO_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
|
||||
for op in STORAGE_IO_SIZE_OPERATIONS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
@@ -1035,7 +1039,9 @@ impl Drop for TimelineMetrics {
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
// we leave the BROKEN_TENANTS_SET entry if any
|
||||
for state in TenantState::VARIANTS {
|
||||
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
|
||||
}
|
||||
}
|
||||
|
||||
use futures::Future;
|
||||
@@ -1050,7 +1056,9 @@ pub struct RemoteTimelineClientMetrics {
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
@@ -1060,13 +1068,14 @@ impl RemoteTimelineClientMetrics {
|
||||
RemoteTimelineClientMetrics {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
|
||||
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
|
||||
guard
|
||||
@@ -1080,17 +1089,26 @@ impl RemoteTimelineClientMetrics {
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn remote_operation_time(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
let mut guard = self.remote_operation_time.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[key.0, key.1, key.2])
|
||||
.unwrap()
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
key.2,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
@@ -1118,10 +1136,19 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Histogram {
|
||||
let mut guard = self.calls_started_hist.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
|
||||
.get_metric_with_label_values(&[key.0, key.1])
|
||||
.unwrap()
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
@@ -1301,10 +1328,15 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
remote_physical_size_gauge,
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
}
|
||||
for ((a, b), _) in calls_unfinished_gauge.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE.remove_label_values(&[
|
||||
tenant_id,
|
||||
@@ -1313,6 +1345,14 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in calls_started_hist.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
|
||||
@@ -130,25 +130,11 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
// this will be wrong if any of the runtimes gets their worker threads configured to something
|
||||
// else, but that has not been needed in a long time.
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -559,7 +545,7 @@ pub fn current_task_id() -> Option<PageserverTaskId> {
|
||||
pub async fn shutdown_watcher() {
|
||||
let token = SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
.expect("shutdown_watcher() called in an unexpected task or thread");
|
||||
.expect("shutdown_requested() called in an unexpected task or thread");
|
||||
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -1336,11 +1335,7 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
@@ -1368,7 +1363,7 @@ impl Tenant {
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
.compact(cancel, ctx)
|
||||
.compact(ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
}
|
||||
@@ -2199,44 +2194,28 @@ impl Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
let tid = tenant_id.to_string();
|
||||
|
||||
fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
|
||||
([state.into()], matches!(state, TenantState::Broken { .. }))
|
||||
}
|
||||
|
||||
let mut tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
if !is_broken {
|
||||
// the tenant might be ignored and reloaded, so first remove any previous set
|
||||
// element. it most likely has already been scraped, as these are manual operations
|
||||
// right now. most likely we will add it back very soon.
|
||||
drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
|
||||
}
|
||||
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.inc();
|
||||
loop {
|
||||
let labels = &tuple.0;
|
||||
let current = TENANT_STATE_METRIC.with_label_values(labels);
|
||||
current.inc();
|
||||
match rx.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.dec();
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, new_state])
|
||||
.inc();
|
||||
|
||||
if rx.changed().await.is_err() {
|
||||
// tenant has been dropped; decrement the counter because a tenant with that
|
||||
// state is no longer in tenant map, but allow any broken set item to exist
|
||||
// still.
|
||||
current.dec();
|
||||
break;
|
||||
}
|
||||
|
||||
current.dec();
|
||||
tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
if is_broken {
|
||||
// insert the tenant_id (back) into the set
|
||||
crate::metrics::BROKEN_TENANTS_SET
|
||||
.with_label_values(&[&tid])
|
||||
.inc();
|
||||
current_state = new_state;
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -3231,7 +3210,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub async fn dump_layerfile_from_path(
|
||||
pub fn dump_layerfile_from_path(
|
||||
path: &Path,
|
||||
verbose: bool,
|
||||
ctx: &RequestContext,
|
||||
@@ -3245,16 +3224,8 @@ pub async fn dump_layerfile_from_path(
|
||||
file.read_exact_at(&mut header_buf, 0)?;
|
||||
|
||||
match u16::from_be_bytes(header_buf) {
|
||||
crate::IMAGE_FILE_MAGIC => {
|
||||
ImageLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
crate::DELTA_FILE_MAGIC => {
|
||||
DeltaLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
magic => bail!("unrecognized magic identifier: {:?}", magic),
|
||||
}
|
||||
|
||||
@@ -3470,7 +3441,6 @@ mod tests {
|
||||
use hex_literal::hex;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
@@ -3992,7 +3962,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4002,7 +3972,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4012,7 +3982,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4022,7 +3992,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
|
||||
@@ -4091,7 +4061,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4168,7 +4138,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4256,7 +4226,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -16,19 +16,29 @@ use crate::tenant::block_io::{BlockCursor, BlockReader};
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
impl<R> BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
/// For reading
|
||||
pub trait BlobCursor {
|
||||
/// Read a blob into a new buffer.
|
||||
pub fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub fn read_blob_into_buf(
|
||||
fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error>;
|
||||
}
|
||||
|
||||
impl<R> BlobCursor for BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
|
||||
@@ -328,7 +328,7 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use rand::{seq::SliceRandom, thread_rng, RngCore};
|
||||
use std::fs;
|
||||
|
||||
@@ -626,17 +626,17 @@ impl LayerMap {
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
#[allow(unused)]
|
||||
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!("Begin dump LayerMap");
|
||||
|
||||
println!("open_layer:");
|
||||
if let Some(open_layer) = &self.open_layer {
|
||||
open_layer.dump(verbose, ctx).await?;
|
||||
open_layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("frozen_layers:");
|
||||
for frozen_layer in self.frozen_layers.iter() {
|
||||
frozen_layer.dump(verbose, ctx).await?;
|
||||
frozen_layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
|
||||
@@ -768,6 +768,55 @@ pub async fn immediate_gc(
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
pub async fn immediate_compact(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<anyhow::Result<()>>, ApiError> {
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_id),
|
||||
Some(timeline_id),
|
||||
&format!(
|
||||
"timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
|
||||
),
|
||||
false,
|
||||
async move {
|
||||
let result = timeline
|
||||
.compact(&ctx)
|
||||
.instrument(info_span!("manual_compact", %tenant_id, %timeline_id))
|
||||
.await;
|
||||
|
||||
match task_done.send(result) {
|
||||
Ok(_) => (),
|
||||
Err(result) => error!("failed to send compaction result: {result:?}"),
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
||||
drop(guard);
|
||||
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -338,8 +338,7 @@ impl LayerAccessStats {
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
#[async_trait::async_trait]
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -369,7 +368,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
async fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -378,7 +377,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Returned by [`PersistentLayer::iter`]
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{PageReadGuard, PAGE_SZ};
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -223,10 +223,9 @@ impl std::fmt::Debug for DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -301,7 +300,7 @@ impl Layer for DeltaLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -155,10 +155,9 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -190,7 +189,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
async fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
@@ -110,7 +110,6 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for InMemoryLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
Key::MIN..Key::MAX
|
||||
@@ -133,7 +132,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_str = inner
|
||||
@@ -184,7 +183,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
async fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -65,9 +65,8 @@ impl std::fmt::Debug for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for RemoteLayer {
|
||||
async fn get_value_reconstruct_data(
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
@@ -78,7 +77,7 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
|
||||
@@ -111,7 +111,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
if let Err(e) = tenant.compaction_iteration(&ctx).await {
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
} else {
|
||||
|
||||
@@ -334,7 +334,7 @@ pub struct GcInfo {
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum PageReconstructError {
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
Other(#[from] anyhow::Error), // source and Display delegate to anyhow::Error
|
||||
|
||||
/// The operation would require downloading a layer that is missing locally.
|
||||
NeedsDownload(TenantTimelineId, LayerFileName),
|
||||
@@ -475,7 +475,7 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
@@ -555,7 +555,7 @@ impl Timeline {
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
|
||||
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
|
||||
let _timer = self.metrics.wait_lsn_time_histo.start_timer();
|
||||
|
||||
match self
|
||||
.last_record_lsn
|
||||
@@ -611,46 +611,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
@@ -708,9 +671,11 @@ impl Timeline {
|
||||
|
||||
let mut failed = 0;
|
||||
|
||||
let mut cancelled = pin!(task_mgr::shutdown_watcher());
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
_ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
res = downloads.next() => {
|
||||
match res {
|
||||
Some(Ok(())) => {},
|
||||
@@ -925,7 +890,7 @@ impl Timeline {
|
||||
new_state,
|
||||
TimelineState::Stopping | TimelineState::Broken { .. }
|
||||
) {
|
||||
// drop the completion guard, if any; it might be holding off the completion
|
||||
// drop the copmletion guard, if any; it might be holding off the completion
|
||||
// forever needlessly
|
||||
self.initial_logical_size_attempt
|
||||
.lock()
|
||||
@@ -2287,9 +2252,8 @@ impl Timeline {
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
|
||||
let mut read_count = scopeguard::guard(0, |cnt| {
|
||||
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
|
||||
});
|
||||
let mut read_count =
|
||||
scopeguard::guard(0, |cnt| self.metrics.read_num_fs_layers.observe(cnt as f64));
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
@@ -2423,15 +2387,12 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2453,15 +2414,12 @@ impl Timeline {
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2492,15 +2450,12 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -3497,7 +3452,7 @@ impl Timeline {
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
@@ -3537,7 +3492,11 @@ impl Timeline {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
(a_key, a_lsn) < (b_key, b_lsn)
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
@@ -3555,7 +3514,11 @@ impl Timeline {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
let (a_key, a_lsn, _) = a;
|
||||
let (b_key, b_lsn, _) = b;
|
||||
(a_key, a_lsn) < (b_key, b_lsn)
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
@@ -149,10 +149,12 @@ impl OpenFiles {
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// distinguish the two.
|
||||
// We do not have information about tenant_id/timeline_id of evicted file.
|
||||
// It is possible to store path together with file or use filepath crate,
|
||||
// but as far as close() is not expected to be fast, it is not so critical to gather
|
||||
// precise per-tenant statistic here.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close-by-replace"])
|
||||
.with_label_values(&["close", "-", "-"])
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
}
|
||||
|
||||
@@ -206,7 +208,7 @@ impl VirtualFile {
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.with_label_values(&["open", &tenant_id, &timeline_id])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
|
||||
// Strip all options other than read and write.
|
||||
@@ -269,7 +271,7 @@ impl VirtualFile {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
}
|
||||
@@ -296,12 +298,12 @@ impl VirtualFile {
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.with_label_values(&["open", &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
@@ -331,11 +333,13 @@ impl Drop for VirtualFile {
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
// Unlike files evicted by replacement algorithm, here
|
||||
// we group close time by tenant_id/timeline_id.
|
||||
// At allows to compare number/time of "normal" file closes
|
||||
// with file eviction.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close"])
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
.with_label_values(&["close", &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,14 +48,6 @@ impl ClientCredentials<'_> {
|
||||
}
|
||||
|
||||
impl<'a> ClientCredentials<'a> {
|
||||
#[cfg(test)]
|
||||
pub fn new_noop() -> Self {
|
||||
ClientCredentials {
|
||||
user: "",
|
||||
project: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse(
|
||||
params: &'a StartupMessageParams,
|
||||
sni: Option<&str>,
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::borrow::Cow;
|
||||
|
||||
use super::*;
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
@@ -308,148 +304,3 @@ fn connect_compute_total_wait() {
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum ConnectAction {
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
}
|
||||
|
||||
struct TestConnectMechanism {
|
||||
counter: Arc<std::sync::Mutex<usize>>,
|
||||
sequence: Vec<ConnectAction>,
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn new(sequence: Vec<ConnectAction>) -> Self {
|
||||
Self {
|
||||
counter: Arc::new(std::sync::Mutex::new(0)),
|
||||
sequence,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestConnection;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestConnectError {
|
||||
retryable: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TestConnectError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TestConnectError {}
|
||||
|
||||
impl ShouldRetry for TestConnectError {
|
||||
fn could_retry(&self) -> bool {
|
||||
self.retryable
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TestConnectMechanism {
|
||||
type Connection = TestConnection;
|
||||
type ConnectError = TestConnectError;
|
||||
type Error = anyhow::Error;
|
||||
|
||||
async fn connect_once(
|
||||
&self,
|
||||
_node_info: &console::CachedNodeInfo,
|
||||
_timeout: time::Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
*counter += 1;
|
||||
match action {
|
||||
ConnectAction::Connect => Ok(TestConnection),
|
||||
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
|
||||
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
|
||||
}
|
||||
|
||||
fn helper_create_connect_info() -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'static, ClientCredentials<'static>>,
|
||||
) {
|
||||
let node = NodeInfo {
|
||||
config: compute::ConnCfg::new(),
|
||||
aux: Default::default(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
let cache = CachedNodeInfo::new_uncached(node);
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("TEST"),
|
||||
};
|
||||
let url = "https://TEST_URL".parse().unwrap();
|
||||
let api = console::provider::mock::Api::new(url);
|
||||
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
|
||||
(cache, extra, creds)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_success() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
/// Even for non-retryable errors, we should retry at least once.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_2() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Retry for at most `NUM_RETRIES_CONNECT` times.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_3() {
|
||||
assert_eq!(NUM_RETRIES_CONNECT, 10);
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![
|
||||
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
/* the 11th time */ Retry,
|
||||
]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
@@ -40,13 +40,10 @@ def parse_metrics(text: str, name: str = "") -> Metrics:
|
||||
return metrics
|
||||
|
||||
|
||||
def histogram(prefix_without_trailing_underscore: str) -> List[str]:
|
||||
assert not prefix_without_trailing_underscore.endswith("_")
|
||||
return [f"{prefix_without_trailing_underscore}_{x}" for x in ["bucket", "count", "sum"]]
|
||||
|
||||
|
||||
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
@@ -70,29 +67,34 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*histogram("pageserver_read_num_fs_layers"),
|
||||
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
*histogram("pageserver_remote_timeline_client_calls_started"),
|
||||
*histogram("pageserver_io_operations_seconds"),
|
||||
"pageserver_tenant_states_count",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_current_logical_size",
|
||||
"pageserver_resident_physical_size",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_count",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_sum",
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"pageserver_io_operations_seconds_bucket",
|
||||
"pageserver_io_operations_seconds_count",
|
||||
"pageserver_io_operations_seconds_sum",
|
||||
"pageserver_last_record_lsn",
|
||||
"pageserver_read_num_fs_layers_bucket",
|
||||
"pageserver_read_num_fs_layers_count",
|
||||
"pageserver_read_num_fs_layers_sum",
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
"pageserver_storage_operations_seconds_count_total",
|
||||
"pageserver_storage_operations_seconds_sum_total",
|
||||
"pageserver_wait_lsn_seconds_bucket",
|
||||
"pageserver_wait_lsn_seconds_count",
|
||||
"pageserver_wait_lsn_seconds_sum",
|
||||
"pageserver_created_persistent_files_total",
|
||||
"pageserver_written_persistent_bytes_total",
|
||||
"pageserver_tenant_states_count",
|
||||
"pageserver_evictions_total",
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# pageserver_broken_tenants_count is a leaked "metric" which is "cleared" on restart or reload
|
||||
)
|
||||
|
||||
@@ -136,6 +136,8 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
|
||||
@@ -140,6 +140,8 @@ def test_metric_collection(
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
|
||||
@@ -8,10 +8,6 @@ from fixtures.utils import query_scalar
|
||||
# Now this test is very minimalistic -
|
||||
# it only checks next_multixact_id field in restored pg_control,
|
||||
# since we don't have functions to check multixact internals.
|
||||
# We do check that the datadir contents exported from the
|
||||
# pageserver match what the running PostgreSQL produced. This
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
@@ -22,8 +18,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE t1(i int primary key, n_updated int);
|
||||
INSERT INTO t1 select g, 0 from generate_series(1, 50) g;
|
||||
CREATE TABLE t1(i int primary key);
|
||||
INSERT INTO t1 select * from generate_series(1, 100);
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -33,7 +29,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
|
||||
# Lock entries using parallel connections in a round-robin fashion.
|
||||
nclients = 20
|
||||
update_every = 97
|
||||
connections = []
|
||||
for _ in range(nclients):
|
||||
# Do not turn on autocommit. We want to hold the key-share locks.
|
||||
@@ -41,20 +36,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
connections.append(conn)
|
||||
|
||||
# On each iteration, we commit the previous transaction on a connection,
|
||||
# and issue another select. Each SELECT generates a new multixact that
|
||||
# and issue antoher select. Each SELECT generates a new multixact that
|
||||
# includes the new XID, and the XIDs of all the other parallel transactions.
|
||||
# This generates enough traffic on both multixact offsets and members SLRUs
|
||||
# to cross page boundaries.
|
||||
for i in range(20000):
|
||||
for i in range(5000):
|
||||
conn = connections[i % nclients]
|
||||
conn.commit()
|
||||
|
||||
# Perform some non-key UPDATEs too, to exercise different multixact
|
||||
# member statuses.
|
||||
if i % update_every == 0:
|
||||
conn.cursor().execute(f"update t1 set n_updated = n_updated + 1 where i = {i % 50}")
|
||||
else:
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
|
||||
# We have multixacts now. We can close the connections.
|
||||
for c in connections:
|
||||
|
||||
@@ -27,16 +27,15 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient):
|
||||
"""
|
||||
This assumes that the pageserver only has a single tenant.
|
||||
"""
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
|
||||
value = client.get_metric_value(
|
||||
"pageserver_remote_operation_seconds_count",
|
||||
{
|
||||
"file_kind": "layer",
|
||||
"op_kind": "download",
|
||||
"status": "success",
|
||||
"tenant_id": tenant_id,
|
||||
"timeline_id": timeline_id,
|
||||
},
|
||||
)
|
||||
if value is None:
|
||||
@@ -58,8 +57,7 @@ def test_ondemand_download_large_rel(
|
||||
test_name="test_ondemand_download_large_rel",
|
||||
)
|
||||
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable background GC
|
||||
@@ -131,7 +129,7 @@ def test_ondemand_download_large_rel(
|
||||
# safekeepers, that have now been shut down.
|
||||
endpoint = env.endpoints.create_start("main", lsn=current_lsn)
|
||||
|
||||
before_downloads = get_num_downloaded_layers(client)
|
||||
before_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
assert before_downloads != 0, "basebackup should on-demand non-zero layers"
|
||||
|
||||
# Probe in the middle of the table. There's a high chance that the beginning
|
||||
@@ -142,7 +140,7 @@ def test_ondemand_download_large_rel(
|
||||
with endpoint.cursor() as cur:
|
||||
assert query_scalar(cur, "select count(*) from tbl where id = 500000") == 1
|
||||
|
||||
after_downloads = get_num_downloaded_layers(client)
|
||||
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
log.info(f"layers downloaded before {before_downloads} and after {after_downloads}")
|
||||
assert after_downloads > before_downloads
|
||||
|
||||
@@ -161,11 +159,13 @@ def test_ondemand_download_timetravel(
|
||||
test_name="test_ondemand_download_timetravel",
|
||||
)
|
||||
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
##### First start, insert data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# Override defaults, to create more layers
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Disable background GC & compaction
|
||||
# We don't want GC, that would break the assertion about num downloads.
|
||||
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
|
||||
@@ -178,7 +178,7 @@ def test_ondemand_download_timetravel(
|
||||
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
|
||||
}
|
||||
)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
env.initial_tenant = tenant
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -283,7 +283,7 @@ def test_ondemand_download_timetravel(
|
||||
== table_len
|
||||
)
|
||||
|
||||
after_downloads = get_num_downloaded_layers(client)
|
||||
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
num_layers_downloaded.append(after_downloads)
|
||||
log.info(f"num_layers_downloaded[-1]={num_layers_downloaded[-1]}")
|
||||
|
||||
@@ -324,8 +324,11 @@ def test_download_remote_layers_api(
|
||||
)
|
||||
|
||||
##### First start, insert data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Disable background GC & compaction
|
||||
# We don't want GC, that would break the assertion about num downloads.
|
||||
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
|
||||
@@ -338,6 +341,7 @@ def test_download_remote_layers_api(
|
||||
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
|
||||
}
|
||||
)
|
||||
env.initial_tenant = tenant
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -485,6 +489,8 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
test_name="test_compaction_downloads_on_demand_without_image_creation",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
@@ -500,8 +506,6 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
|
||||
|
||||
def downloaded_bytes_and_count(pageserver_http: PageserverHttpClient) -> Tuple[int, int]:
|
||||
m = pageserver_http.get_metrics()
|
||||
# these are global counters
|
||||
@@ -513,12 +517,11 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
assert count < 2**53 and count.is_integer(), "count should still be safe integer-in-f64"
|
||||
return (int(total_bytes), int(count))
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
with env.endpoints.create_start("main") as endpoint:
|
||||
# no particular reason to create the layers like this, but we are sure
|
||||
# not to hit the image_creation_threshold here.
|
||||
@@ -574,6 +577,8 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
test_name="test_compaction_downloads_on_demand",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
@@ -588,11 +593,9 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
@@ -661,6 +664,10 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ondemand_download_failure_to_replace(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
@@ -684,10 +691,9 @@ def test_ondemand_download_failure_to_replace(
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
lsn = Lsn(pageserver_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
@@ -718,7 +724,3 @@ def test_ondemand_download_failure_to_replace(
|
||||
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
|
||||
|
||||
# if the above returned, then we didn't have a livelock, and all is well
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
@@ -378,10 +378,12 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
test_name="test_remote_timeline_client_metrics",
|
||||
)
|
||||
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
@@ -396,10 +398,6 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
}
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -421,7 +419,6 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert timeline_id is not None
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
calls_started: Dict[Tuple[str, str], List[int]] = {
|
||||
@@ -431,14 +428,13 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
}
|
||||
|
||||
def fetch_calls_started():
|
||||
assert timeline_id is not None
|
||||
for (file_kind, op_kind), observations in calls_started.items():
|
||||
val = client.get_metric_value(
|
||||
name="pageserver_remote_timeline_client_calls_started_count",
|
||||
filter={
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_started_count",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
file_kind,
|
||||
op_kind,
|
||||
)
|
||||
assert val is not None, f"expecting metric to be present: {file_kind} {op_kind}"
|
||||
val = int(val)
|
||||
@@ -522,8 +518,12 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
test_name="test_timeline_deletion_with_files_stuck_in_upload_queue",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many operations
|
||||
"checkpoint_distance": f"{64 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
@@ -535,10 +535,6 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
"pitr_interval": "0s",
|
||||
}
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
timeline_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
@@ -791,8 +787,12 @@ def test_compaction_delete_before_upload(
|
||||
test_name="test_compaction_delete_before_upload",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and disables gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Set a small compaction threshold
|
||||
"compaction_threshold": "3",
|
||||
# Disable GC
|
||||
@@ -802,10 +802,6 @@ def test_compaction_delete_before_upload(
|
||||
}
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
|
||||
@@ -2,7 +2,6 @@ import asyncio
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
@@ -22,7 +21,6 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
def do_gc_target(
|
||||
@@ -856,89 +854,3 @@ def ensure_test_data(data_id: int, data: str, endpoint: Endpoint):
|
||||
assert (
|
||||
query_scalar(cur, f"SELECT secret FROM test WHERE id = {data_id};") == data
|
||||
), "Should have timeline data back"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_metrics_while_ignoring_broken_tenant_and_reloading(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_metrics_while_ignoring_broken_tenant_and_reloading",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
env.pageserver.allowed_errors.append(
|
||||
r".* Changing Active tenant to Broken state, reason: broken from test"
|
||||
)
|
||||
|
||||
def only_int(samples: List[Sample]) -> Optional[int]:
|
||||
if len(samples) == 1:
|
||||
return int(samples[0].value)
|
||||
assert len(samples) == 0
|
||||
return None
|
||||
|
||||
wait_until_tenant_state(client, env.initial_tenant, "Active", 10, 0.5)
|
||||
|
||||
client.tenant_break(env.initial_tenant)
|
||||
|
||||
found_broken = False
|
||||
active, broken, broken_set = ([], [], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
|
||||
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_broken = only_int(active) == 0 and only_int(broken) == 1 and only_int(broken_set) == 1
|
||||
|
||||
if found_broken:
|
||||
break
|
||||
log.info(f"active: {active}, broken: {broken}, broken_set: {broken_set}")
|
||||
time.sleep(0.5)
|
||||
assert (
|
||||
found_broken
|
||||
), f"tenant shows up as broken; active={active}, broken={broken}, broken_set={broken_set}"
|
||||
|
||||
client.tenant_ignore(env.initial_tenant)
|
||||
|
||||
found_broken = False
|
||||
broken, broken_set = ([], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_broken = only_int(broken) == 0 and only_int(broken_set) == 1
|
||||
|
||||
if found_broken:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
assert (
|
||||
found_broken
|
||||
), f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}"
|
||||
|
||||
client.tenant_load(env.initial_tenant)
|
||||
|
||||
found_active = False
|
||||
active, broken_set = ([], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_active = only_int(active) == 1 and len(broken_set) == 0
|
||||
|
||||
if found_active:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
assert (
|
||||
found_active
|
||||
), f"reloaded tenant should be active, and broken tenant set item removed: active={active}, broken_set={broken_set}"
|
||||
|
||||
@@ -213,9 +213,6 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Test (a subset of) pageserver global metrics
|
||||
for metric in PAGESERVER_GLOBAL_METRICS:
|
||||
if metric.startswith("pageserver_remote"):
|
||||
continue
|
||||
|
||||
ps_samples = ps_metrics.query_all(metric, {})
|
||||
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
|
||||
for sample in ps_samples:
|
||||
@@ -383,8 +380,10 @@ def test_pageserver_with_empty_tenants(
|
||||
ps_metrics = client.get_metrics()
|
||||
broken_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_without_timelines_dir),
|
||||
"state": "Broken",
|
||||
}
|
||||
active_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_with_empty_timelines),
|
||||
"state": "Active",
|
||||
}
|
||||
|
||||
@@ -400,7 +399,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
tenant_broken_count = int(
|
||||
ps_metrics.query_one(
|
||||
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
|
||||
"pageserver_tenant_states_count", filter=broken_tenants_metric_filter
|
||||
).value
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user