mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-07 02:10:37 +00:00
Compare commits
15 Commits
proxy-http
...
add-profil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8657f8e0c0 | ||
|
|
c358014389 | ||
|
|
c0af1a5044 | ||
|
|
094e7606a4 | ||
|
|
dd4ff8ef9e | ||
|
|
13b076401a | ||
|
|
5615aca244 | ||
|
|
922c4f07d5 | ||
|
|
0671fdd265 | ||
|
|
3f4fd576c6 | ||
|
|
4790f8725e | ||
|
|
ea309b2a5b | ||
|
|
8568456f95 | ||
|
|
d9af548ca8 | ||
|
|
128176702e |
1600
Cargo.lock
generated
1600
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -255,6 +255,7 @@ pub enum GenericRemoteStorage {
|
|||||||
AwsS3(Arc<S3Bucket>),
|
AwsS3(Arc<S3Bucket>),
|
||||||
AzureBlob(Arc<AzureBlobStorage>),
|
AzureBlob(Arc<AzureBlobStorage>),
|
||||||
Unreliable(Arc<UnreliableWrapper>),
|
Unreliable(Arc<UnreliableWrapper>),
|
||||||
|
Nothing,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GenericRemoteStorage {
|
impl GenericRemoteStorage {
|
||||||
@@ -268,6 +269,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.list(prefix, mode).await,
|
Self::AwsS3(s) => s.list(prefix, mode).await,
|
||||||
Self::AzureBlob(s) => s.list(prefix, mode).await,
|
Self::AzureBlob(s) => s.list(prefix, mode).await,
|
||||||
Self::Unreliable(s) => s.list(prefix, mode).await,
|
Self::Unreliable(s) => s.list(prefix, mode).await,
|
||||||
|
Self::Nothing => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -280,6 +282,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.list_files(folder).await,
|
Self::AwsS3(s) => s.list_files(folder).await,
|
||||||
Self::AzureBlob(s) => s.list_files(folder).await,
|
Self::AzureBlob(s) => s.list_files(folder).await,
|
||||||
Self::Unreliable(s) => s.list_files(folder).await,
|
Self::Unreliable(s) => s.list_files(folder).await,
|
||||||
|
Self::Nothing => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -295,6 +298,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.list_prefixes(prefix).await,
|
Self::AwsS3(s) => s.list_prefixes(prefix).await,
|
||||||
Self::AzureBlob(s) => s.list_prefixes(prefix).await,
|
Self::AzureBlob(s) => s.list_prefixes(prefix).await,
|
||||||
Self::Unreliable(s) => s.list_prefixes(prefix).await,
|
Self::Unreliable(s) => s.list_prefixes(prefix).await,
|
||||||
|
Self::Nothing => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,6 +314,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||||
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||||
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||||
|
Self::Nothing => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -319,6 +324,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.download(from).await,
|
Self::AwsS3(s) => s.download(from).await,
|
||||||
Self::AzureBlob(s) => s.download(from).await,
|
Self::AzureBlob(s) => s.download(from).await,
|
||||||
Self::Unreliable(s) => s.download(from).await,
|
Self::Unreliable(s) => s.download(from).await,
|
||||||
|
Self::Nothing => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -345,6 +351,7 @@ impl GenericRemoteStorage {
|
|||||||
s.download_byte_range(from, start_inclusive, end_exclusive)
|
s.download_byte_range(from, start_inclusive, end_exclusive)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
Self::Nothing => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -354,6 +361,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.delete(path).await,
|
Self::AwsS3(s) => s.delete(path).await,
|
||||||
Self::AzureBlob(s) => s.delete(path).await,
|
Self::AzureBlob(s) => s.delete(path).await,
|
||||||
Self::Unreliable(s) => s.delete(path).await,
|
Self::Unreliable(s) => s.delete(path).await,
|
||||||
|
Self::Nothing => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -363,6 +371,7 @@ impl GenericRemoteStorage {
|
|||||||
Self::AwsS3(s) => s.delete_objects(paths).await,
|
Self::AwsS3(s) => s.delete_objects(paths).await,
|
||||||
Self::AzureBlob(s) => s.delete_objects(paths).await,
|
Self::AzureBlob(s) => s.delete_objects(paths).await,
|
||||||
Self::Unreliable(s) => s.delete_objects(paths).await,
|
Self::Unreliable(s) => s.delete_objects(paths).await,
|
||||||
|
Self::Nothing => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -384,6 +393,7 @@ impl GenericRemoteStorage {
|
|||||||
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
|
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
|
||||||
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
|
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
|
||||||
}
|
}
|
||||||
|
RemoteStorageKind::Nothing => Self::Nothing,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -438,6 +448,8 @@ pub struct RemoteStorageConfig {
|
|||||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum RemoteStorageKind {
|
pub enum RemoteStorageKind {
|
||||||
|
/// For microbenchmarks it's useful to turn off remote storage
|
||||||
|
Nothing,
|
||||||
/// Storage based on local file system.
|
/// Storage based on local file system.
|
||||||
/// Specify a root folder to place all stored files into.
|
/// Specify a root folder to place all stored files into.
|
||||||
LocalFs(Utf8PathBuf),
|
LocalFs(Utf8PathBuf),
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ default = []
|
|||||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||||
# which adds some runtime cost to run tests on outage conditions
|
# which adds some runtime cost to run tests on outage conditions
|
||||||
testing = ["fail/failpoints"]
|
testing = ["fail/failpoints"]
|
||||||
|
profiling = ["pprof"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
@@ -83,6 +84,7 @@ enum-map.workspace = true
|
|||||||
enumset.workspace = true
|
enumset.workspace = true
|
||||||
strum.workspace = true
|
strum.workspace = true
|
||||||
strum_macros.workspace = true
|
strum_macros.workspace = true
|
||||||
|
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion.workspace = true
|
criterion.workspace = true
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ const PID_FILE_NAME: &str = "pageserver.pid";
|
|||||||
const FEATURES: &[&str] = &[
|
const FEATURES: &[&str] = &[
|
||||||
#[cfg(feature = "testing")]
|
#[cfg(feature = "testing")]
|
||||||
"testing",
|
"testing",
|
||||||
|
#[cfg(feature = "profiling")]
|
||||||
|
"profiling",
|
||||||
];
|
];
|
||||||
|
|
||||||
fn version() -> String {
|
fn version() -> String {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ pub mod metrics;
|
|||||||
pub mod page_cache;
|
pub mod page_cache;
|
||||||
pub mod page_service;
|
pub mod page_service;
|
||||||
pub mod pgdatadir_mapping;
|
pub mod pgdatadir_mapping;
|
||||||
|
pub mod profiling;
|
||||||
pub mod repository;
|
pub mod repository;
|
||||||
pub(crate) mod statvfs;
|
pub(crate) mod statvfs;
|
||||||
pub mod task_mgr;
|
pub mod task_mgr;
|
||||||
|
|||||||
87
pageserver/src/profiling.rs
Normal file
87
pageserver/src/profiling.rs
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
//!
|
||||||
|
//! Support for profiling
|
||||||
|
//!
|
||||||
|
//! This relies on a modified version of the 'pprof-rs' crate. That's not very
|
||||||
|
//! nice, so to avoid a hard dependency on that, this is an optional feature.
|
||||||
|
//!
|
||||||
|
|
||||||
|
/// The actual implementation is in the `profiling_impl` submodule. If the profiling
|
||||||
|
/// feature is not enabled, it's just a dummy implementation that panics if you
|
||||||
|
/// try to enabled profiling in the configuration.
|
||||||
|
pub use profiling_impl::*;
|
||||||
|
|
||||||
|
#[cfg(feature = "profiling")]
|
||||||
|
mod profiling_impl {
|
||||||
|
use super::*;
|
||||||
|
use pprof;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
/// Start profiling the current thread. Returns a guard object;
|
||||||
|
/// the profiling continues until the guard is dropped.
|
||||||
|
///
|
||||||
|
/// Note: profiling is not re-entrant. If you call 'profpoint_start' while
|
||||||
|
/// profiling is already started, nothing happens, and the profiling will be
|
||||||
|
/// stopped when either guard object is dropped.
|
||||||
|
#[inline]
|
||||||
|
pub fn profpoint_start() -> Option<ProfilingGuard> {
|
||||||
|
pprof::start_profiling();
|
||||||
|
Some(ProfilingGuard(PhantomData))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A hack to remove Send and Sync from the ProfilingGuard. Because the
|
||||||
|
/// profiling is attached to current thread.
|
||||||
|
////
|
||||||
|
/// See comments in https://github.com/rust-lang/rust/issues/68318
|
||||||
|
type PhantomUnsend = std::marker::PhantomData<*mut u8>;
|
||||||
|
|
||||||
|
pub struct ProfilingGuard(PhantomUnsend);
|
||||||
|
|
||||||
|
unsafe impl Send for ProfilingGuard {}
|
||||||
|
|
||||||
|
impl Drop for ProfilingGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
pprof::stop_profiling();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the profiler. This must be called before any 'profpoint_start' calls.
|
||||||
|
pub fn init_profiler<'a>() -> Option<pprof::ProfilerGuard<'a>> {
|
||||||
|
Some(pprof::ProfilerGuardBuilder::default().build().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Exit the profiler. Writes the flamegraph to current workdir.
|
||||||
|
pub fn exit_profiler(profiler_guard: &Option<pprof::ProfilerGuard>) {
|
||||||
|
// Write out the flamegraph
|
||||||
|
if let Some(profiler_guard) = profiler_guard {
|
||||||
|
if let Ok(report) = profiler_guard.report().build() {
|
||||||
|
// this gets written under the workdir
|
||||||
|
let file = std::fs::File::create("flamegraph.svg").unwrap();
|
||||||
|
let mut options = pprof::flamegraph::Options::default();
|
||||||
|
options.image_width = Some(2500);
|
||||||
|
report.flamegraph_with_options(file, &mut options).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dummy implementation when compiling without profiling feature or for non-linux OSes.
|
||||||
|
#[cfg(not(feature = "profiling"))]
|
||||||
|
mod profiling_impl {
|
||||||
|
pub struct DummyProfilerGuard;
|
||||||
|
|
||||||
|
impl Drop for DummyProfilerGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// do nothing, this exists to calm Clippy down
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn profpoint_start() -> Option<DummyProfilerGuard> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init_profiler() -> Option<DummyProfilerGuard> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn exit_profiler(profiler_guard: &Option<DummyProfilerGuard>) {}
|
||||||
|
}
|
||||||
@@ -3760,7 +3760,10 @@ pub(crate) mod harness {
|
|||||||
let remote_fs_dir = conf.workdir.join("localfs");
|
let remote_fs_dir = conf.workdir.join("localfs");
|
||||||
std::fs::create_dir_all(&remote_fs_dir).unwrap();
|
std::fs::create_dir_all(&remote_fs_dir).unwrap();
|
||||||
let config = RemoteStorageConfig {
|
let config = RemoteStorageConfig {
|
||||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
storage: RemoteStorageKind::Nothing,
|
||||||
|
|
||||||
|
// TODO use the following for tests that need it:
|
||||||
|
// storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||||
};
|
};
|
||||||
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
|
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
|
||||||
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
|
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
|
||||||
|
|||||||
@@ -2113,21 +2113,6 @@ mod tests {
|
|||||||
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
||||||
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||||
|
|
||||||
// Bootstrap a real timeline. We can't use create_test_timeline because
|
|
||||||
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
|
||||||
// the garbage data.
|
|
||||||
//
|
|
||||||
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
|
||||||
// problems with inconsistent initdb results after pg minor version bumps.
|
|
||||||
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
|
||||||
.unwrap()
|
|
||||||
.load()
|
|
||||||
.await;
|
|
||||||
let tline = tenant
|
|
||||||
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// We fully read and decompress this into memory before decoding
|
// We fully read and decompress this into memory before decoding
|
||||||
// to get a more accurate perf profile of the decoder.
|
// to get a more accurate perf profile of the decoder.
|
||||||
let bytes = {
|
let bytes = {
|
||||||
@@ -2141,32 +2126,72 @@ mod tests {
|
|||||||
buffer
|
buffer
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO start a profiler too
|
// Allow number of iterations to be configured via env var, which is
|
||||||
let started_at = std::time::Instant::now();
|
// useful when using this test for benchmarking.
|
||||||
|
let n_iterations: usize =
|
||||||
|
std::env::var("NUM_TEST_ITERATIONS")
|
||||||
|
.map(|s| s.parse().unwrap())
|
||||||
|
.unwrap_or(1);
|
||||||
|
let profiler = crate::profiling::init_profiler();
|
||||||
|
|
||||||
// Initialize walingest
|
for iteration in 0..n_iterations {
|
||||||
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
// Bootstrap a real timeline. We can't use create_test_timeline because
|
||||||
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
||||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
// the garbage data.
|
||||||
.await
|
//
|
||||||
.unwrap();
|
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
||||||
let mut modification = tline.begin_modification(endpoint);
|
// problems with inconsistent initdb results after pg minor version bumps.
|
||||||
let mut decoded = DecodedWALRecord::default();
|
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
||||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
.unwrap()
|
||||||
|
.load()
|
||||||
|
.await;
|
||||||
|
let tline = tenant
|
||||||
|
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Decode and ingest wal. We process the wal in chunks because
|
// Initialize walingest
|
||||||
// that's what happens when we get bytes from safekeepers.
|
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||||
for chunk in bytes[xlogoff..].chunks(50) {
|
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
||||||
decoder.feed_bytes(chunk);
|
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
.await
|
||||||
walingest
|
.unwrap();
|
||||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
let mut modification = tline.begin_modification(endpoint);
|
||||||
.await
|
let mut decoded = DecodedWALRecord::default();
|
||||||
.unwrap();
|
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||||
|
|
||||||
|
// Start profiling
|
||||||
|
let prof_guard = crate::profiling::profpoint_start();
|
||||||
|
let started_at = std::time::Instant::now();
|
||||||
|
|
||||||
|
// Decode and ingest wal.
|
||||||
|
//
|
||||||
|
// NOTE We process the wal in chunks because that's what happens
|
||||||
|
// when we get bytes from safekeepers. We use size 1906 because
|
||||||
|
// that was the average chunk size during the test that generated
|
||||||
|
// this data.
|
||||||
|
for chunk in bytes[xlogoff..].chunks(1906) {
|
||||||
|
decoder.feed_bytes(chunk);
|
||||||
|
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||||
|
walingest
|
||||||
|
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do most of the work we do on every XLogData message in
|
||||||
|
// walreceiver_connection.rs just to check that at the current
|
||||||
|
// chunk size this work doesn't matter.
|
||||||
|
tline.check_checkpoint_distance().await.unwrap();
|
||||||
|
tline.get_current_logical_size(&ctx).size_dont_care_about_accuracy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
drop(prof_guard);
|
||||||
|
|
||||||
|
let duration = started_at.elapsed();
|
||||||
|
println!("done iteration {} in {:?}", iteration, duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
let duration = started_at.elapsed();
|
crate::profiling::exit_profiler(&profiler);
|
||||||
println!("done in {:?}", duration);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
BIN
pageserver/test_data/sk_wal_segment_from_pgbench.gz
Normal file
BIN
pageserver/test_data/sk_wal_segment_from_pgbench.gz
Normal file
Binary file not shown.
Reference in New Issue
Block a user