Compare commits

...

15 Commits

Author SHA1 Message Date
Bojan Serafimov
8657f8e0c0 wip 2023-12-04 15:56:07 -05:00
Bojan Serafimov
c358014389 configurable n_iterations 2023-12-04 15:15:25 -05:00
Bojan Serafimov
c0af1a5044 wip 2023-12-04 15:08:27 -05:00
Bojan Serafimov
094e7606a4 wip 2023-12-04 14:39:55 -05:00
Bojan Serafimov
dd4ff8ef9e wip 2023-12-04 14:19:45 -05:00
Bojan Serafimov
13b076401a no remote storage 2023-12-04 13:20:15 -05:00
Bojan Serafimov
5615aca244 Merge branch 'main' into add-profiler 2023-12-01 09:36:59 -05:00
Bojan Serafimov
922c4f07d5 Repeat 1000 times 2023-11-28 13:13:21 -05:00
Bojan Serafimov
0671fdd265 cargo lock 2023-11-28 12:56:13 -05:00
Bojan Serafimov
3f4fd576c6 Merge branch 'main' into add-profiler 2023-11-28 12:53:39 -05:00
Bojan Serafimov
4790f8725e Basic profiler setup (wip) 2023-11-27 17:18:00 -05:00
Bojan Serafimov
ea309b2a5b Remove anyhow, compress test data 2023-11-27 15:10:32 -05:00
bojanserafimov
8568456f95 accept suggestion
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-27 11:23:30 -05:00
bojanserafimov
d9af548ca8 accept suggestion
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-27 11:23:21 -05:00
Bojan Serafimov
128176702e Add walingest test 2023-11-21 13:05:39 -05:00
9 changed files with 1147 additions and 661 deletions

1600
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -255,6 +255,7 @@ pub enum GenericRemoteStorage {
AwsS3(Arc<S3Bucket>),
AzureBlob(Arc<AzureBlobStorage>),
Unreliable(Arc<UnreliableWrapper>),
Nothing,
}
impl GenericRemoteStorage {
@@ -268,6 +269,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.list(prefix, mode).await,
Self::AzureBlob(s) => s.list(prefix, mode).await,
Self::Unreliable(s) => s.list(prefix, mode).await,
Self::Nothing => unimplemented!(),
}
}
@@ -280,6 +282,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.list_files(folder).await,
Self::AzureBlob(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
Self::Nothing => unimplemented!(),
}
}
@@ -295,6 +298,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::AzureBlob(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
Self::Nothing => unimplemented!(),
}
}
@@ -310,6 +314,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Nothing => Ok(()),
}
}
@@ -319,6 +324,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.download(from).await,
Self::AzureBlob(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
Self::Nothing => unimplemented!(),
}
}
@@ -345,6 +351,7 @@ impl GenericRemoteStorage {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::Nothing => unimplemented!(),
}
}
@@ -354,6 +361,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.delete(path).await,
Self::AzureBlob(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
Self::Nothing => Ok(()),
}
}
@@ -363,6 +371,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => s.delete_objects(paths).await,
Self::AzureBlob(s) => s.delete_objects(paths).await,
Self::Unreliable(s) => s.delete_objects(paths).await,
Self::Nothing => Ok(()),
}
}
}
@@ -384,6 +393,7 @@ impl GenericRemoteStorage {
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
}
RemoteStorageKind::Nothing => Self::Nothing,
})
}
@@ -438,6 +448,8 @@ pub struct RemoteStorageConfig {
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemoteStorageKind {
/// For microbenchmarks it's useful to turn off remote storage
Nothing,
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs(Utf8PathBuf),

View File

@@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
profiling = ["pprof"]
[dependencies]
anyhow.workspace = true
@@ -83,6 +84,7 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
[dev-dependencies]
criterion.workspace = true

View File

@@ -49,6 +49,8 @@ const PID_FILE_NAME: &str = "pageserver.pid";
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];
fn version() -> String {

View File

@@ -15,6 +15,7 @@ pub mod metrics;
pub mod page_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod profiling;
pub mod repository;
pub(crate) mod statvfs;
pub mod task_mgr;

View File

@@ -0,0 +1,87 @@
//!
//! Support for profiling
//!
//! This relies on a modified version of the 'pprof-rs' crate. That's not very
//! nice, so to avoid a hard dependency on that, this is an optional feature.
//!
/// The actual implementation is in the `profiling_impl` submodule. If the profiling
/// feature is not enabled, it's just a dummy implementation that panics if you
/// try to enabled profiling in the configuration.
pub use profiling_impl::*;
#[cfg(feature = "profiling")]
mod profiling_impl {
use super::*;
use pprof;
use std::marker::PhantomData;
/// Start profiling the current thread. Returns a guard object;
/// the profiling continues until the guard is dropped.
///
/// Note: profiling is not re-entrant. If you call 'profpoint_start' while
/// profiling is already started, nothing happens, and the profiling will be
/// stopped when either guard object is dropped.
#[inline]
pub fn profpoint_start() -> Option<ProfilingGuard> {
pprof::start_profiling();
Some(ProfilingGuard(PhantomData))
}
/// A hack to remove Send and Sync from the ProfilingGuard. Because the
/// profiling is attached to current thread.
////
/// See comments in https://github.com/rust-lang/rust/issues/68318
type PhantomUnsend = std::marker::PhantomData<*mut u8>;
pub struct ProfilingGuard(PhantomUnsend);
unsafe impl Send for ProfilingGuard {}
impl Drop for ProfilingGuard {
fn drop(&mut self) {
pprof::stop_profiling();
}
}
/// Initialize the profiler. This must be called before any 'profpoint_start' calls.
pub fn init_profiler<'a>() -> Option<pprof::ProfilerGuard<'a>> {
Some(pprof::ProfilerGuardBuilder::default().build().unwrap())
}
/// Exit the profiler. Writes the flamegraph to current workdir.
pub fn exit_profiler(profiler_guard: &Option<pprof::ProfilerGuard>) {
// Write out the flamegraph
if let Some(profiler_guard) = profiler_guard {
if let Ok(report) = profiler_guard.report().build() {
// this gets written under the workdir
let file = std::fs::File::create("flamegraph.svg").unwrap();
let mut options = pprof::flamegraph::Options::default();
options.image_width = Some(2500);
report.flamegraph_with_options(file, &mut options).unwrap();
}
}
}
}
/// Dummy implementation when compiling without profiling feature or for non-linux OSes.
#[cfg(not(feature = "profiling"))]
mod profiling_impl {
pub struct DummyProfilerGuard;
impl Drop for DummyProfilerGuard {
fn drop(&mut self) {
// do nothing, this exists to calm Clippy down
}
}
pub fn profpoint_start() -> Option<DummyProfilerGuard> {
None
}
pub fn init_profiler() -> Option<DummyProfilerGuard> {
None
}
pub fn exit_profiler(profiler_guard: &Option<DummyProfilerGuard>) {}
}

View File

@@ -3760,7 +3760,10 @@ pub(crate) mod harness {
let remote_fs_dir = conf.workdir.join("localfs");
std::fs::create_dir_all(&remote_fs_dir).unwrap();
let config = RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
storage: RemoteStorageKind::Nothing,
// TODO use the following for tests that need it:
// storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));

View File

@@ -2113,21 +2113,6 @@ mod tests {
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
//
// TODO use the initdb.tar.zst file stored with the test data to avoid
// problems with inconsistent initdb results after pg minor version bumps.
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
.unwrap()
.load()
.await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
.await
.unwrap();
// We fully read and decompress this into memory before decoding
// to get a more accurate perf profile of the decoder.
let bytes = {
@@ -2141,32 +2126,72 @@ mod tests {
buffer
};
// TODO start a profiler too
let started_at = std::time::Instant::now();
// Allow number of iterations to be configured via env var, which is
// useful when using this test for benchmarking.
let n_iterations: usize =
std::env::var("NUM_TEST_ITERATIONS")
.map(|s| s.parse().unwrap())
.unwrap_or(1);
let profiler = crate::profiling::init_profiler();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
for iteration in 0..n_iterations {
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
//
// TODO use the initdb.tar.zst file stored with the test data to avoid
// problems with inconsistent initdb results after pg minor version bumps.
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
.unwrap()
.load()
.await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
.await
.unwrap();
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.unwrap();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Start profiling
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
// Decode and ingest wal.
//
// NOTE We process the wal in chunks because that's what happens
// when we get bytes from safekeepers. We use size 1906 because
// that was the average chunk size during the test that generated
// this data.
for chunk in bytes[xlogoff..].chunks(1906) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.unwrap();
}
// Do most of the work we do on every XLogData message in
// walreceiver_connection.rs just to check that at the current
// chunk size this work doesn't matter.
tline.check_checkpoint_distance().await.unwrap();
tline.get_current_logical_size(&ctx).size_dont_care_about_accuracy();
}
drop(prof_guard);
let duration = started_at.elapsed();
println!("done iteration {} in {:?}", iteration, duration);
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
crate::profiling::exit_profiler(&profiler);
}
}

Binary file not shown.