mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
Revert "debug cruft, likely will revert but this proved useful, esp log_if_slow"
This reverts commit 0fd67b27e5.
Bunch of conflicts.
This commit is contained in:
41
Cargo.lock
generated
41
Cargo.lock
generated
@@ -1323,45 +1323,6 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"tonic",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-subscriber"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01"
|
||||
dependencies = [
|
||||
"console-api",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"futures-task",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"hyper-util",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.9.6"
|
||||
@@ -6675,7 +6636,6 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -7282,7 +7242,6 @@ dependencies = [
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"console-subscriber",
|
||||
"const_format",
|
||||
"criterion",
|
||||
"diatomic-waker",
|
||||
|
||||
@@ -20,7 +20,6 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
console-subscriber = "*"
|
||||
diatomic-waker.workspace = true
|
||||
flate2.workspace = true
|
||||
git-version.workspace = true
|
||||
@@ -43,7 +42,7 @@ serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = {workspace = true, features = ["tracing"] }
|
||||
tokio.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = ["serde"] }
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use std::{num::NonZeroUsize, str::FromStr};
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
|
||||
use crate::env;
|
||||
|
||||
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
@@ -136,20 +134,6 @@ pub fn init(
|
||||
let r = r.with(
|
||||
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
|
||||
);
|
||||
let r = r.with(
|
||||
if let Some(n) = env::var("NEON_ENABLE_TOKIO_CONSOLE_SUBSCRIBER") {
|
||||
let n: NonZeroUsize = n;
|
||||
use console_subscriber::ConsoleLayer;
|
||||
Some(
|
||||
console_subscriber::Builder::default()
|
||||
.event_buffer_capacity(n.get() * ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY)
|
||||
.client_buffer_capacity(n.get() * ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY)
|
||||
.spawn(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
);
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
|
||||
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
|
||||
@@ -57,7 +57,7 @@ sysinfo.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "tracing"] }
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-epoll-uring.workspace = true
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
|
||||
@@ -375,29 +375,6 @@ async fn timed_after_cancellation<Fut: std::future::Future>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_if_slow<Fut: std::future::Future>(
|
||||
name: &str,
|
||||
warn_at: std::time::Duration,
|
||||
fut: Fut,
|
||||
) -> <Fut as std::future::Future>::Output {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
match tokio::time::timeout(warn_at, &mut fut).await {
|
||||
Ok(ret) => ret,
|
||||
Err(_) => {
|
||||
tracing::trace!(
|
||||
what = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"slow"
|
||||
);
|
||||
|
||||
fut.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod timed_tests {
|
||||
use super::timed;
|
||||
|
||||
@@ -1246,7 +1246,6 @@ impl PageServerHandler {
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
let err = loop {
|
||||
trace!("waiting for message");
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
@@ -1257,7 +1256,6 @@ impl PageServerHandler {
|
||||
request_span.clone(),
|
||||
)
|
||||
.await;
|
||||
trace!(is_err = msg.is_err(), "message received");
|
||||
let msg = match msg {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => break e,
|
||||
@@ -1269,11 +1267,10 @@ impl PageServerHandler {
|
||||
return ((pgb_reader, timeline_handles), Ok(()));
|
||||
}
|
||||
};
|
||||
trace!("throttling message");
|
||||
|
||||
if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await {
|
||||
break cancelled;
|
||||
}
|
||||
trace!("handling message");
|
||||
|
||||
let err = self
|
||||
.pagesteam_handle_batched_message(
|
||||
@@ -1288,7 +1285,6 @@ impl PageServerHandler {
|
||||
Ok(()) => {}
|
||||
Err(e) => break e,
|
||||
}
|
||||
trace!("message handled");
|
||||
};
|
||||
((pgb_reader, timeline_handles), Err(err))
|
||||
}
|
||||
|
||||
@@ -10011,7 +10011,7 @@ mod tests {
|
||||
|
||||
let keyspace = KeySpace::single(get_key(0)..get_key(10));
|
||||
let results = tline
|
||||
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
|
||||
.get_vectored(keyspace, delta_layer_end_lsn, IoConcurrency::todo(), &ctx)
|
||||
.await
|
||||
.expect("No vectored errors");
|
||||
for (key, res) in results {
|
||||
|
||||
@@ -4,14 +4,12 @@
|
||||
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::log_if_slow;
|
||||
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
#[cfg(test)]
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::Deref;
|
||||
use std::time::Duration;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -213,27 +211,19 @@ impl<'a> FileBlockReader<'a> {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease<'b>, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
match log_if_slow(
|
||||
"read_immutable_buf",
|
||||
Duration::from_secs(1),
|
||||
cache.read_immutable_buf(self.file_id, blknum, ctx),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = log_if_slow(
|
||||
"fill_buffer",
|
||||
Duration::from_secs(1),
|
||||
self.fill_buffer(write_guard, blknum, ctx),
|
||||
)
|
||||
.await?;
|
||||
let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,14 +30,12 @@ use std::{
|
||||
iter::Rev,
|
||||
ops::{Range, RangeInclusive},
|
||||
result,
|
||||
time::Duration,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
log_if_slow,
|
||||
task_mgr::TaskKind,
|
||||
tenant::block_io::{BlockReader, BlockWriter},
|
||||
};
|
||||
@@ -304,8 +302,8 @@ where
|
||||
// We could keep the page cache read guard alive, but, at the time of writing,
|
||||
// we run quite small PS PageCache s => can't risk running out of
|
||||
// PageCache space because this stream isn't consumed fast enough.
|
||||
let page_read_guard = log_if_slow("read_blk", Duration::from_secs(1), block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx))
|
||||
let page_read_guard = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
node_buf.copy_from_slice(page_read_guard.as_ref());
|
||||
drop(page_read_guard); // drop page cache read guard early
|
||||
|
||||
@@ -415,15 +415,6 @@ impl IoConcurrency {
|
||||
where
|
||||
F: std::future::Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
static IO_NUM: AtomicUsize = AtomicUsize::new(0);
|
||||
let io_num = IO_NUM.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let fut = async move {
|
||||
trace!("start");
|
||||
scopeguard::defer!(trace!("end"));
|
||||
fut.await
|
||||
}
|
||||
.instrument(tracing::trace_span!("spawned_io", %io_num));
|
||||
tracing::trace!(%io_num, "spawning IO");
|
||||
match self {
|
||||
IoConcurrency::Serial => fut.await,
|
||||
IoConcurrency::FuturesUnordered { ios_tx, .. } => {
|
||||
|
||||
@@ -885,7 +885,6 @@ impl DeltaLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -897,30 +896,18 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
trace!("enter");
|
||||
scopeguard::defer!({
|
||||
trace!("exit");
|
||||
});
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
let nranges = keyspace.ranges.len();
|
||||
trace!("Planning reads for {nranges} ranges");
|
||||
for (i, range) in keyspace.ranges.iter().enumerate() {
|
||||
trace!("range {i}/{nranges}");
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
|
||||
let mut n = 0;
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
trace!("index entry {n}");
|
||||
n += 1;
|
||||
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
@@ -999,18 +986,12 @@ impl DeltaLayerInner {
|
||||
largest_read_size
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
trace!("enter");
|
||||
scopeguard::defer!({
|
||||
trace!("exit");
|
||||
});
|
||||
|
||||
let max_vectored_read_bytes = self
|
||||
.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
@@ -1023,10 +1004,7 @@ impl DeltaLayerInner {
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
// track when a key is done.
|
||||
let reads_len = reads.len();
|
||||
trace!("Processing {reads_len} reads");
|
||||
for (i, read) in reads.into_iter().rev().enumerate() {
|
||||
trace!("read {i}/{reads_len}");
|
||||
for read in reads.into_iter().rev() {
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
sync::oneshot::Sender<Result<OnDiskValue, std::io::Error>>,
|
||||
|
||||
@@ -56,7 +56,7 @@ use utils::{
|
||||
};
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{
|
||||
@@ -1148,7 +1148,6 @@ impl Timeline {
|
||||
vectored_res
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all, fields(request_num = tracing::field::Empty))]
|
||||
pub(super) async fn get_vectored_impl(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
@@ -1162,11 +1161,6 @@ impl Timeline {
|
||||
GetKind::Vectored
|
||||
};
|
||||
|
||||
static REQUEST_NUM: AtomicUsize = AtomicUsize::new(0);
|
||||
let request_num = REQUEST_NUM.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
tracing::Span::current().record("request_num", &request_num);
|
||||
|
||||
trace!("getting reconstruct data");
|
||||
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
@@ -1179,7 +1173,6 @@ impl Timeline {
|
||||
.start_timer();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
trace!("waiting for reconstruct data and reconstructing values");
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
@@ -1187,7 +1180,6 @@ impl Timeline {
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
trace!("collecting pending IOs");
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
@@ -1195,13 +1187,11 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
trace!("reconstructing value");
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
}
|
||||
.instrument(tracing::trace_span!("key_loop", key = %key, lsn = lsn.0))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3419,7 +3409,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: internalize
|
||||
// TODO: move this to a function
|
||||
trace!("waiting for futures to complete");
|
||||
match &reconstruct_state.io_concurrency {
|
||||
super::storage_layer::IoConcurrency::Serial => (),
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::context::RequestContext;
|
||||
use crate::log_if_slow;
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
@@ -29,7 +28,6 @@ use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::time::Duration;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
@@ -936,11 +934,10 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
let file_guard =
|
||||
match log_if_slow("lock_file", Duration::from_secs(1), self.lock_file()).await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
|
||||
@@ -15,9 +15,6 @@ pub(super) mod tokio_epoll_uring_ext;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::Instrument;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use {crate::log_if_slow, std::time::Duration};
|
||||
|
||||
pub(crate) use super::api::IoEngineKind;
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(u8)]
|
||||
@@ -152,18 +149,8 @@ impl IoEngine {
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = log_if_slow(
|
||||
"thread_local_system",
|
||||
Duration::from_secs(1),
|
||||
tokio_epoll_uring_ext::thread_local_system(),
|
||||
)
|
||||
.await;
|
||||
let (resources, res) = log_if_slow(
|
||||
"system.read",
|
||||
Duration::from_secs(1),
|
||||
system.read(file_guard, offset, slice),
|
||||
)
|
||||
.await;
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, slice).await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user