From f6f947e4ec2809bddef56c6dd768c8a6da68a020 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 7 Jan 2025 12:26:04 +0100 Subject: [PATCH] Revert "debug cruft, likely will revert but this proved useful, esp log_if_slow" This reverts commit 0fd67b27e535b3ec5a667f1512fd2aec25a6ac92. Bunch of conflicts. --- Cargo.lock | 41 ------------------- libs/utils/Cargo.toml | 3 +- libs/utils/src/logging.rs | 18 +------- pageserver/Cargo.toml | 2 +- pageserver/src/lib.rs | 23 ----------- pageserver/src/page_service.rs | 6 +-- pageserver/src/tenant.rs | 2 +- pageserver/src/tenant/block_io.rs | 30 +++++--------- pageserver/src/tenant/disk_btree.rs | 6 +-- pageserver/src/tenant/storage_layer.rs | 9 ---- .../src/tenant/storage_layer/delta_layer.rs | 26 +----------- pageserver/src/tenant/timeline.rs | 14 +------ pageserver/src/virtual_file.rs | 11 ++--- pageserver/src/virtual_file/io_engine.rs | 17 +------- 14 files changed, 27 insertions(+), 181 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e059f4d0bf..3b1933df75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1323,45 +1323,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "console-api" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" -dependencies = [ - "futures-core", - "prost", - "prost-types", - "tonic", - "tracing-core", -] - -[[package]] -name = "console-subscriber" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" -dependencies = [ - "console-api", - "crossbeam-channel", - "crossbeam-utils", - "futures-task", - "hdrhistogram", - "humantime", - "hyper-util", - "prost", - "prost-types", - "serde", - "serde_json", - "thread_local", - "tokio", - "tokio-stream", - "tonic", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -6675,7 +6636,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.48.0", ] @@ -7282,7 +7242,6 @@ dependencies = [ "camino", "camino-tempfile", "chrono", - "console-subscriber", "const_format", "criterion", "diatomic-waker", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index cb4626fac8..02bf77760a 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -20,7 +20,6 @@ bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true -console-subscriber = "*" diatomic-waker.workspace = true flate2.workspace = true git-version.workspace = true @@ -43,7 +42,7 @@ serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true -tokio = {workspace = true, features = ["tracing"] } +tokio.workspace = true tokio-tar.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = ["serde"] } diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 0f20718eee..e205d60d74 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,12 +1,10 @@ -use std::{num::NonZeroUsize, str::FromStr}; +use std::str::FromStr; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; -use crate::env; - #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum LogFormat { @@ -136,20 +134,6 @@ pub fn init( let r = r.with( TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()), ); - let r = r.with( - if let Some(n) = env::var("NEON_ENABLE_TOKIO_CONSOLE_SUBSCRIBER") { - let n: NonZeroUsize = n; - use console_subscriber::ConsoleLayer; - Some( - console_subscriber::Builder::default() - .event_buffer_capacity(n.get() * ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY) - .client_buffer_capacity(n.get() * ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY) - .spawn(), - ) - } else { - None - }, - ); match tracing_error_layer_enablement { TracingErrorLayerEnablement::EnableWithRustLogFilter => r .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index e33cbd98a1..140b287ccc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -57,7 +57,7 @@ sysinfo.workspace = true tokio-tar.workspace = true thiserror.workspace = true tikv-jemallocator.workspace = true -tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "tracing"] } +tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } tokio-epoll-uring.workspace = true tokio-io-timeout.workspace = true tokio-postgres.workspace = true diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 1ac7586769..ff6af3566c 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -375,29 +375,6 @@ async fn timed_after_cancellation( } } -async fn log_if_slow( - name: &str, - warn_at: std::time::Duration, - fut: Fut, -) -> ::Output { - let started = std::time::Instant::now(); - - let mut fut = std::pin::pin!(fut); - - match tokio::time::timeout(warn_at, &mut fut).await { - Ok(ret) => ret, - Err(_) => { - tracing::trace!( - what = name, - elapsed_ms = started.elapsed().as_millis(), - "slow" - ); - - fut.await - } - } -} - #[cfg(test)] mod timed_tests { use super::timed; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index aa09d382eb..2c3c786f69 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1246,7 +1246,6 @@ impl PageServerHandler { { let cancel = self.cancel.clone(); let err = loop { - trace!("waiting for message"); let msg = Self::pagestream_read_message( &mut pgb_reader, tenant_id, @@ -1257,7 +1256,6 @@ impl PageServerHandler { request_span.clone(), ) .await; - trace!(is_err = msg.is_err(), "message received"); let msg = match msg { Ok(msg) => msg, Err(e) => break e, @@ -1269,11 +1267,10 @@ impl PageServerHandler { return ((pgb_reader, timeline_handles), Ok(())); } }; - trace!("throttling message"); + if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await { break cancelled; } - trace!("handling message"); let err = self .pagesteam_handle_batched_message( @@ -1288,7 +1285,6 @@ impl PageServerHandler { Ok(()) => {} Err(e) => break e, } - trace!("message handled"); }; ((pgb_reader, timeline_handles), Err(err)) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 11cda886a6..ca3e958233 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -10011,7 +10011,7 @@ mod tests { let keyspace = KeySpace::single(get_key(0)..get_key(10)); let results = tline - .get_vectored(keyspace, delta_layer_end_lsn, &ctx) + .get_vectored(keyspace, delta_layer_end_lsn, IoConcurrency::todo(), &ctx) .await .expect("No vectored errors"); for (key, res) in results { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 52e09eb513..990211f80a 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,14 +4,12 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::log_if_slow; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; #[cfg(test)] use crate::virtual_file::IoBufferMut; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; -use std::time::Duration; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -213,27 +211,19 @@ impl<'a> FileBlockReader<'a> { ctx: &RequestContext, ) -> Result, std::io::Error> { let cache = page_cache::get(); - match log_if_slow( - "read_immutable_buf", - Duration::from_secs(1), - cache.read_immutable_buf(self.file_id, blknum, ctx), - ) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { + match cache + .read_immutable_buf(self.file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { ReadBufResult::Found(guard) => Ok(guard.into()), ReadBufResult::NotFound(write_guard) => { // Read the page from disk into the buffer - let write_guard = log_if_slow( - "fill_buffer", - Duration::from_secs(1), - self.fill_buffer(write_guard, blknum, ctx), - ) - .await?; + let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index dd32d5963d..c77342b144 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -30,14 +30,12 @@ use std::{ iter::Rev, ops::{Range, RangeInclusive}, result, - time::Duration, }; use thiserror::Error; use tracing::error; use crate::{ context::{DownloadBehavior, RequestContext}, - log_if_slow, task_mgr::TaskKind, tenant::block_io::{BlockReader, BlockWriter}, }; @@ -304,8 +302,8 @@ where // We could keep the page cache read guard alive, but, at the time of writing, // we run quite small PS PageCache s => can't risk running out of // PageCache space because this stream isn't consumed fast enough. - let page_read_guard = log_if_slow("read_blk", Duration::from_secs(1), block_cursor - .read_blk(self.start_blk + node_blknum, ctx)) + let page_read_guard = block_cursor + .read_blk(self.start_blk + node_blknum, ctx) .await?; node_buf.copy_from_slice(page_read_guard.as_ref()); drop(page_read_guard); // drop page cache read guard early diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 13710f93dc..39df7e4094 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -415,15 +415,6 @@ impl IoConcurrency { where F: std::future::Future + Send + 'static, { - static IO_NUM: AtomicUsize = AtomicUsize::new(0); - let io_num = IO_NUM.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let fut = async move { - trace!("start"); - scopeguard::defer!(trace!("end")); - fut.await - } - .instrument(tracing::trace_span!("spawned_io", %io_num)); - tracing::trace!(%io_num, "spawning IO"); match self { IoConcurrency::Serial => fut.await, IoConcurrency::FuturesUnordered { ios_tx, .. } => { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 4073c8bf62..6f58e7af5d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -885,7 +885,6 @@ impl DeltaLayerInner { Ok(()) } - #[instrument(level = "trace", skip_all)] async fn plan_reads( keyspace: &KeySpace, lsn_range: Range, @@ -897,30 +896,18 @@ impl DeltaLayerInner { where Reader: BlockReader + Clone, { - trace!("enter"); - scopeguard::defer!({ - trace!("exit"); - }); - let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(); - let nranges = keyspace.ranges.len(); - trace!("Planning reads for {nranges} ranges"); - for (i, range) in keyspace.ranges.iter().enumerate() { - trace!("range {i}/{nranges}"); + for range in keyspace.ranges.iter() { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx); let mut index_stream = std::pin::pin!(index_stream); - let mut n = 0; while let Some(index_entry) = index_stream.next().await { - trace!("index entry {n}"); - n += 1; - let (raw_key, value) = index_entry?; let key = Key::from_slice(&raw_key[..KEY_SIZE]); let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); @@ -999,18 +986,12 @@ impl DeltaLayerInner { largest_read_size } - #[instrument(level = "trace", skip_all)] async fn do_reads_and_update_state( &self, reads: Vec, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) { - trace!("enter"); - scopeguard::defer!({ - trace!("exit"); - }); - let max_vectored_read_bytes = self .max_vectored_read_bytes .expect("Layer is loaded with max vectored bytes config") @@ -1023,10 +1004,7 @@ impl DeltaLayerInner { // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can // track when a key is done. - let reads_len = reads.len(); - trace!("Processing {reads_len} reads"); - for (i, read) in reads.into_iter().rev().enumerate() { - trace!("read {i}/{reads_len}"); + for read in reads.into_iter().rev() { let mut senders: HashMap< (Key, Lsn), sync::oneshot::Sender>, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 80545dbe50..5c9079a4fc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -56,7 +56,7 @@ use utils::{ }; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; -use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use std::{ @@ -1148,7 +1148,6 @@ impl Timeline { vectored_res } - #[instrument(level = "trace", skip_all, fields(request_num = tracing::field::Empty))] pub(super) async fn get_vectored_impl( &self, keyspace: KeySpace, @@ -1162,11 +1161,6 @@ impl Timeline { GetKind::Vectored }; - static REQUEST_NUM: AtomicUsize = AtomicUsize::new(0); - let request_num = REQUEST_NUM.fetch_add(1, AtomicOrdering::Relaxed); - tracing::Span::current().record("request_num", &request_num); - - trace!("getting reconstruct data"); let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME .for_get_kind(get_kind) .start_timer(); @@ -1179,7 +1173,6 @@ impl Timeline { .start_timer(); let layers_visited = reconstruct_state.get_layers_visited(); - trace!("waiting for reconstruct data and reconstructing values"); let futs = FuturesUnordered::new(); for (key, state) in std::mem::take(&mut reconstruct_state.keys) { futs.push({ @@ -1187,7 +1180,6 @@ impl Timeline { async move { assert_eq!(state.situation, ValueReconstructSituation::Complete); - trace!("collecting pending IOs"); let converted = match state.collect_pending_ios().await { Ok(ok) => ok, Err(err) => { @@ -1195,13 +1187,11 @@ impl Timeline { } }; - trace!("reconstructing value"); ( key, walredo_self.reconstruct_value(key, lsn, converted).await, ) } - .instrument(tracing::trace_span!("key_loop", key = %key, lsn = lsn.0)) }); } @@ -3419,7 +3409,7 @@ impl Timeline { } } - // TODO: internalize + // TODO: move this to a function trace!("waiting for futures to complete"); match &reconstruct_state.io_concurrency { super::storage_layer::IoConcurrency::Serial => (), diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 3f93404ece..8a7f4a4bf5 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,7 +12,6 @@ //! src/backend/storage/file/fd.c //! use crate::context::RequestContext; -use crate::log_if_slow; use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; use crate::page_cache::{PageWriteGuard, PAGE_SZ}; @@ -29,7 +28,6 @@ use std::fs::File; use std::io::{Error, ErrorKind, Seek, SeekFrom}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; -use std::time::Duration; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; @@ -936,11 +934,10 @@ impl VirtualFileInner { where Buf: tokio_epoll_uring::IoBufMut + Send, { - let file_guard = - match log_if_slow("lock_file", Duration::from_secs(1), self.lock_file()).await { - Ok(file_guard) => file_guard, - Err(e) => return (buf, Err(e)), - }; + let file_guard = match self.lock_file().await { + Ok(file_guard) => file_guard, + Err(e) => return (buf, Err(e)), + }; observe_duration!(StorageIoOperation::Read, { let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await; diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 9aa9170bbd..ccde90ee1a 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -15,9 +15,6 @@ pub(super) mod tokio_epoll_uring_ext; use tokio_epoll_uring::IoBuf; use tracing::Instrument; -#[cfg(target_os = "linux")] -use {crate::log_if_slow, std::time::Duration}; - pub(crate) use super::api::IoEngineKind; #[derive(Clone, Copy)] #[repr(u8)] @@ -152,18 +149,8 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = log_if_slow( - "thread_local_system", - Duration::from_secs(1), - tokio_epoll_uring_ext::thread_local_system(), - ) - .await; - let (resources, res) = log_if_slow( - "system.read", - Duration::from_secs(1), - system.read(file_guard, offset, slice), - ) - .await; + let system = tokio_epoll_uring_ext::thread_local_system().await; + let (resources, res) = system.read(file_guard, offset, slice).await; (resources, res.map_err(epoll_uring_error_to_std)) } }