mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 02:42:56 +00:00
Compare commits
15 Commits
proxy-cpla
...
add-profil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8657f8e0c0 | ||
|
|
c358014389 | ||
|
|
c0af1a5044 | ||
|
|
094e7606a4 | ||
|
|
dd4ff8ef9e | ||
|
|
13b076401a | ||
|
|
5615aca244 | ||
|
|
922c4f07d5 | ||
|
|
0671fdd265 | ||
|
|
3f4fd576c6 | ||
|
|
4790f8725e | ||
|
|
ea309b2a5b | ||
|
|
8568456f95 | ||
|
|
d9af548ca8 | ||
|
|
128176702e |
1600
Cargo.lock
generated
1600
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -255,6 +255,7 @@ pub enum GenericRemoteStorage {
|
||||
AwsS3(Arc<S3Bucket>),
|
||||
AzureBlob(Arc<AzureBlobStorage>),
|
||||
Unreliable(Arc<UnreliableWrapper>),
|
||||
Nothing,
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
@@ -268,6 +269,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.list(prefix, mode).await,
|
||||
Self::AzureBlob(s) => s.list(prefix, mode).await,
|
||||
Self::Unreliable(s) => s.list(prefix, mode).await,
|
||||
Self::Nothing => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,6 +282,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::AzureBlob(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
Self::Nothing => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,6 +298,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.list_prefixes(prefix).await,
|
||||
Self::AzureBlob(s) => s.list_prefixes(prefix).await,
|
||||
Self::Unreliable(s) => s.list_prefixes(prefix).await,
|
||||
Self::Nothing => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,6 +314,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
|
||||
Self::Nothing => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,6 +324,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.download(from).await,
|
||||
Self::AzureBlob(s) => s.download(from).await,
|
||||
Self::Unreliable(s) => s.download(from).await,
|
||||
Self::Nothing => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +351,7 @@ impl GenericRemoteStorage {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive)
|
||||
.await
|
||||
}
|
||||
Self::Nothing => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -354,6 +361,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.delete(path).await,
|
||||
Self::AzureBlob(s) => s.delete(path).await,
|
||||
Self::Unreliable(s) => s.delete(path).await,
|
||||
Self::Nothing => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,6 +371,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => s.delete_objects(paths).await,
|
||||
Self::AzureBlob(s) => s.delete_objects(paths).await,
|
||||
Self::Unreliable(s) => s.delete_objects(paths).await,
|
||||
Self::Nothing => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -384,6 +393,7 @@ impl GenericRemoteStorage {
|
||||
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
|
||||
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
|
||||
}
|
||||
RemoteStorageKind::Nothing => Self::Nothing,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -438,6 +448,8 @@ pub struct RemoteStorageConfig {
|
||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum RemoteStorageKind {
|
||||
/// For microbenchmarks it's useful to turn off remote storage
|
||||
Nothing,
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs(Utf8PathBuf),
|
||||
|
||||
@@ -9,6 +9,7 @@ default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
profiling = ["pprof"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -83,6 +84,7 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -49,6 +49,8 @@ const PID_FILE_NAME: &str = "pageserver.pid";
|
||||
const FEATURES: &[&str] = &[
|
||||
#[cfg(feature = "testing")]
|
||||
"testing",
|
||||
#[cfg(feature = "profiling")]
|
||||
"profiling",
|
||||
];
|
||||
|
||||
fn version() -> String {
|
||||
|
||||
@@ -15,6 +15,7 @@ pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod profiling;
|
||||
pub mod repository;
|
||||
pub(crate) mod statvfs;
|
||||
pub mod task_mgr;
|
||||
|
||||
87
pageserver/src/profiling.rs
Normal file
87
pageserver/src/profiling.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
//!
|
||||
//! Support for profiling
|
||||
//!
|
||||
//! This relies on a modified version of the 'pprof-rs' crate. That's not very
|
||||
//! nice, so to avoid a hard dependency on that, this is an optional feature.
|
||||
//!
|
||||
|
||||
/// The actual implementation is in the `profiling_impl` submodule. If the profiling
|
||||
/// feature is not enabled, it's just a dummy implementation that panics if you
|
||||
/// try to enabled profiling in the configuration.
|
||||
pub use profiling_impl::*;
|
||||
|
||||
#[cfg(feature = "profiling")]
|
||||
mod profiling_impl {
|
||||
use super::*;
|
||||
use pprof;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// Start profiling the current thread. Returns a guard object;
|
||||
/// the profiling continues until the guard is dropped.
|
||||
///
|
||||
/// Note: profiling is not re-entrant. If you call 'profpoint_start' while
|
||||
/// profiling is already started, nothing happens, and the profiling will be
|
||||
/// stopped when either guard object is dropped.
|
||||
#[inline]
|
||||
pub fn profpoint_start() -> Option<ProfilingGuard> {
|
||||
pprof::start_profiling();
|
||||
Some(ProfilingGuard(PhantomData))
|
||||
}
|
||||
|
||||
/// A hack to remove Send and Sync from the ProfilingGuard. Because the
|
||||
/// profiling is attached to current thread.
|
||||
////
|
||||
/// See comments in https://github.com/rust-lang/rust/issues/68318
|
||||
type PhantomUnsend = std::marker::PhantomData<*mut u8>;
|
||||
|
||||
pub struct ProfilingGuard(PhantomUnsend);
|
||||
|
||||
unsafe impl Send for ProfilingGuard {}
|
||||
|
||||
impl Drop for ProfilingGuard {
|
||||
fn drop(&mut self) {
|
||||
pprof::stop_profiling();
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the profiler. This must be called before any 'profpoint_start' calls.
|
||||
pub fn init_profiler<'a>() -> Option<pprof::ProfilerGuard<'a>> {
|
||||
Some(pprof::ProfilerGuardBuilder::default().build().unwrap())
|
||||
}
|
||||
|
||||
/// Exit the profiler. Writes the flamegraph to current workdir.
|
||||
pub fn exit_profiler(profiler_guard: &Option<pprof::ProfilerGuard>) {
|
||||
// Write out the flamegraph
|
||||
if let Some(profiler_guard) = profiler_guard {
|
||||
if let Ok(report) = profiler_guard.report().build() {
|
||||
// this gets written under the workdir
|
||||
let file = std::fs::File::create("flamegraph.svg").unwrap();
|
||||
let mut options = pprof::flamegraph::Options::default();
|
||||
options.image_width = Some(2500);
|
||||
report.flamegraph_with_options(file, &mut options).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Dummy implementation when compiling without profiling feature or for non-linux OSes.
|
||||
#[cfg(not(feature = "profiling"))]
|
||||
mod profiling_impl {
|
||||
pub struct DummyProfilerGuard;
|
||||
|
||||
impl Drop for DummyProfilerGuard {
|
||||
fn drop(&mut self) {
|
||||
// do nothing, this exists to calm Clippy down
|
||||
}
|
||||
}
|
||||
|
||||
pub fn profpoint_start() -> Option<DummyProfilerGuard> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn init_profiler() -> Option<DummyProfilerGuard> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn exit_profiler(profiler_guard: &Option<DummyProfilerGuard>) {}
|
||||
}
|
||||
@@ -3760,7 +3760,10 @@ pub(crate) mod harness {
|
||||
let remote_fs_dir = conf.workdir.join("localfs");
|
||||
std::fs::create_dir_all(&remote_fs_dir).unwrap();
|
||||
let config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
storage: RemoteStorageKind::Nothing,
|
||||
|
||||
// TODO use the following for tests that need it:
|
||||
// storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
|
||||
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
|
||||
|
||||
@@ -2113,21 +2113,6 @@ mod tests {
|
||||
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
||||
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
|
||||
// Bootstrap a real timeline. We can't use create_test_timeline because
|
||||
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
||||
// the garbage data.
|
||||
//
|
||||
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
||||
// problems with inconsistent initdb results after pg minor version bumps.
|
||||
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// We fully read and decompress this into memory before decoding
|
||||
// to get a more accurate perf profile of the decoder.
|
||||
let bytes = {
|
||||
@@ -2141,32 +2126,72 @@ mod tests {
|
||||
buffer
|
||||
};
|
||||
|
||||
// TODO start a profiler too
|
||||
let started_at = std::time::Instant::now();
|
||||
// Allow number of iterations to be configured via env var, which is
|
||||
// useful when using this test for benchmarking.
|
||||
let n_iterations: usize =
|
||||
std::env::var("NUM_TEST_ITERATIONS")
|
||||
.map(|s| s.parse().unwrap())
|
||||
.unwrap_or(1);
|
||||
let profiler = crate::profiling::init_profiler();
|
||||
|
||||
// Initialize walingest
|
||||
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
for iteration in 0..n_iterations {
|
||||
// Bootstrap a real timeline. We can't use create_test_timeline because
|
||||
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
||||
// the garbage data.
|
||||
//
|
||||
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
||||
// problems with inconsistent initdb results after pg minor version bumps.
|
||||
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Decode and ingest wal. We process the wal in chunks because
|
||||
// that's what happens when we get bytes from safekeepers.
|
||||
for chunk in bytes[xlogoff..].chunks(50) {
|
||||
decoder.feed_bytes(chunk);
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
// Initialize walingest
|
||||
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
|
||||
// Start profiling
|
||||
let prof_guard = crate::profiling::profpoint_start();
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// Decode and ingest wal.
|
||||
//
|
||||
// NOTE We process the wal in chunks because that's what happens
|
||||
// when we get bytes from safekeepers. We use size 1906 because
|
||||
// that was the average chunk size during the test that generated
|
||||
// this data.
|
||||
for chunk in bytes[xlogoff..].chunks(1906) {
|
||||
decoder.feed_bytes(chunk);
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Do most of the work we do on every XLogData message in
|
||||
// walreceiver_connection.rs just to check that at the current
|
||||
// chunk size this work doesn't matter.
|
||||
tline.check_checkpoint_distance().await.unwrap();
|
||||
tline.get_current_logical_size(&ctx).size_dont_care_about_accuracy();
|
||||
}
|
||||
|
||||
drop(prof_guard);
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
println!("done iteration {} in {:?}", iteration, duration);
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
println!("done in {:?}", duration);
|
||||
crate::profiling::exit_profiler(&profiler);
|
||||
}
|
||||
}
|
||||
|
||||
BIN
pageserver/test_data/sk_wal_segment_from_pgbench.gz
Normal file
BIN
pageserver/test_data/sk_wal_segment_from_pgbench.gz
Normal file
Binary file not shown.
Reference in New Issue
Block a user