Compare commits

...

5 Commits

Author SHA1 Message Date
Bojan Serafimov
92016ff9b1 wip 2023-11-21 12:45:21 -05:00
Bojan Serafimov
c4af96ee7e wip 2023-11-21 12:15:49 -05:00
Bojan Serafimov
65eb48aab7 WIP: profile walingest 2023-11-21 10:24:05 -05:00
Bojan Serafimov
779a5ab9ff wip 2023-11-08 13:47:54 -05:00
Bojan Serafimov
3ce8969711 wip 2023-11-07 13:58:59 -05:00
11 changed files with 427 additions and 24 deletions

237
Cargo.lock generated
View File

@@ -77,6 +77,17 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "ahash"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd"
dependencies = [
"getrandom 0.2.9",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.3"
@@ -185,6 +196,12 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "arrayvec"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "asn1-rs"
version = "0.5.2"
@@ -386,6 +403,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -474,7 +502,7 @@ dependencies = [
"http",
"percent-encoding",
"tracing",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -848,7 +876,7 @@ dependencies = [
"log",
"paste",
"pin-project",
"quick-xml",
"quick-xml 0.30.0",
"rand 0.8.5",
"reqwest",
"rustc_version 0.4.0",
@@ -856,7 +884,7 @@ dependencies = [
"serde_json",
"time 0.3.21",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -877,7 +905,7 @@ dependencies = [
"time 0.3.21",
"tz-rs",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -899,7 +927,7 @@ dependencies = [
"sha2 0.10.6",
"time 0.3.21",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -919,7 +947,7 @@ dependencies = [
"serde_json",
"time 0.3.21",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -1067,6 +1095,12 @@ version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "bytemuck"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6"
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -1452,6 +1486,15 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]]
name = "cpp_demangle"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeaa953eaad386a53111e47172c2fedba671e5684c8dd601a5f474f4f118710f"
dependencies = [
"cfg-if",
]
[[package]]
name = "cpufeatures"
version = "0.2.9"
@@ -1678,6 +1721,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "debugid"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6ee87af31d84ef885378aebca32be3d682b0e0dc119d5b4860a2c5bb5046730"
dependencies = [
"uuid 0.8.2",
]
[[package]]
name = "debugid"
version = "0.8.0"
@@ -1685,7 +1737,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"serde",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -1885,6 +1937,18 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -2196,7 +2260,7 @@ version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash",
"ahash 0.8.3",
]
[[package]]
@@ -2232,6 +2296,15 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.3"
@@ -2508,6 +2581,24 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
[[package]]
name = "inferno"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3886428c6400486522cf44b8626e7b94ad794c14390290f2a274dcf728a58f"
dependencies = [
"ahash 0.7.7",
"atty",
"indexmap",
"itoa",
"lazy_static",
"log",
"num-format",
"quick-xml 0.22.0",
"rgb",
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
@@ -2559,7 +2650,7 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.3",
"libc",
"windows-sys 0.48.0",
]
@@ -2576,7 +2667,7 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.3",
"io-lifetimes",
"rustix 0.37.25",
"windows-sys 0.48.0",
@@ -2759,6 +2850,24 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memmap2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.7.1"
@@ -2864,6 +2973,19 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset 0.6.5",
]
[[package]]
name = "nix"
version = "0.25.1"
@@ -2938,6 +3060,16 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-format"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -2963,7 +3095,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.3",
"libc",
]
@@ -3256,6 +3388,7 @@ dependencies = [
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -3663,6 +3796,25 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pprof"
version = "0.6.1"
source = "git+https://github.com/neondatabase/pprof-rs.git?branch=wallclock-profiling#4e011a87d22fb4d21d15cc38bce81ff1c75e4bc9"
dependencies = [
"backtrace",
"cfg-if",
"findshlibs",
"inferno",
"lazy_static",
"libc",
"log",
"nix 0.23.2",
"parking_lot 0.11.2",
"symbolic-demangle",
"tempfile",
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -3869,12 +4021,21 @@ dependencies = [
"tracing-utils",
"url",
"utils",
"uuid",
"uuid 1.3.3",
"webpki-roots 0.25.2",
"workspace_hack",
"x509-parser",
]
[[package]]
name = "quick-xml"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b"
dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.30.0"
@@ -4205,6 +4366,15 @@ dependencies = [
"rand 0.8.5",
]
[[package]]
name = "rgb"
version = "0.8.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05aaa8004b64fd573fc9d002f4e632d51ad4f026c2b5ba95fcb6c2f32c2c47d8"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.16.20"
@@ -4667,7 +4837,7 @@ version = "0.31.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99dc599bd6646884fc403d593cdcb9816dd67c50cff3271c01ff123617908dcd"
dependencies = [
"debugid",
"debugid 0.8.0",
"getrandom 0.2.9",
"hex",
"serde",
@@ -4675,7 +4845,7 @@ dependencies = [
"thiserror",
"time 0.3.21",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@@ -5050,6 +5220,12 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "stringprep"
version = "0.1.2"
@@ -5097,6 +5273,29 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f551f902d5642e58039aee6a9021a61037926af96e071816361644983966f540"
dependencies = [
"debugid 0.7.3",
"memmap2",
"stable_deref_trait",
"uuid 0.8.2",
]
[[package]]
name = "symbolic-demangle"
version = "8.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4564ca7b4e6eb14105aa8bbbce26e080f6b5d9c4373e67167ab31f7b86443750"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.109"
@@ -5998,10 +6197,16 @@ dependencies = [
"tracing-error",
"tracing-subscriber",
"url",
"uuid",
"uuid 1.3.3",
"workspace_hack",
]
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
[[package]]
name = "uuid"
version = "1.3.3"
@@ -6527,7 +6732,7 @@ dependencies = [
"tracing-core",
"tungstenite",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]

View File

@@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
profiling = ["pprof"]
[dependencies]
anyhow.workspace = true
@@ -82,6 +83,7 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
[dev-dependencies]
criterion.workspace = true

View File

@@ -49,6 +49,8 @@ const PID_FILE_NAME: &str = "pageserver.pid";
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];
fn version() -> String {
@@ -253,7 +255,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
) -> anyhow::Result<()> {
) -> anyhow::Result<()> { // TODO this should be anyhow::Result<!>
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
@@ -271,6 +273,8 @@ fn start_pageserver(
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
let profiler_guard = pageserver::profiling::init_profiler();
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
let failpoints = fail::list();
@@ -672,6 +676,9 @@ fn start_pageserver(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
pageserver::profiling::exit_profiler(&profiler_guard);
std::process::exit(111);
}
@@ -687,6 +694,9 @@ fn start_pageserver(
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::profiling::exit_profiler(&profiler_guard);
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
bg_remote_storage.map(|_| bg_deletion_queue),
0,

View File

@@ -618,3 +618,66 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::harness::*;
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
let pg_version = 15;
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await?;
// Get test data. Steps to reconstruct it, if needed:
// 1. Run the pgbench python test
// 2. Take the first wal segment file from safekeeper
// 3. Grep sk logs for "restart decoder" to get startpoint
// 4. Run just the decoder from this test to get the endpoint.
// It's the last LSN the decoder will output.
let path = "test_data/sk_wal_segment_from_pgbench";
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
// We fully read this into memory before decoding to get a
// more accurate perf profile of the decoder.
let bytes = std::fs::read(path)?;
let profiler_guard = crate::profiling::init_profiler();
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?;
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await?;
}
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
drop(prof_guard);
crate::profiling::exit_profiler(&profiler_guard);
Ok(())
}
}

View File

@@ -24,6 +24,7 @@ pub mod virtual_file;
pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod profiling;
pub mod failpoint_support;

View File

@@ -0,0 +1,87 @@
//!
//! Support for profiling
//!
//! This relies on a modified version of the 'pprof-rs' crate. That's not very
//! nice, so to avoid a hard dependency on that, this is an optional feature.
//!
/// The actual implementation is in the `profiling_impl` submodule. If the profiling
/// feature is not enabled, it's just a dummy implementation that panics if you
/// try to enabled profiling in the configuration.
pub use profiling_impl::*;
#[cfg(feature = "profiling")]
mod profiling_impl {
use super::*;
use pprof;
use std::marker::PhantomData;
/// Start profiling the current thread. Returns a guard object;
/// the profiling continues until the guard is dropped.
///
/// Note: profiling is not re-entrant. If you call 'profpoint_start' while
/// profiling is already started, nothing happens, and the profiling will be
/// stopped when either guard object is dropped.
#[inline]
pub fn profpoint_start() -> Option<ProfilingGuard> {
pprof::start_profiling();
Some(ProfilingGuard(PhantomData))
}
/// A hack to remove Send and Sync from the ProfilingGuard. Because the
/// profiling is attached to current thread.
////
/// See comments in https://github.com/rust-lang/rust/issues/68318
type PhantomUnsend = std::marker::PhantomData<*mut u8>;
pub struct ProfilingGuard(PhantomUnsend);
unsafe impl Send for ProfilingGuard {}
impl Drop for ProfilingGuard {
fn drop(&mut self) {
pprof::stop_profiling();
}
}
/// Initialize the profiler. This must be called before any 'profpoint_start' calls.
pub fn init_profiler<'a>() -> Option<pprof::ProfilerGuard<'a>> {
Some(pprof::ProfilerGuardBuilder::default().build().unwrap())
}
/// Exit the profiler. Writes the flamegraph to current workdir.
pub fn exit_profiler(profiler_guard: &Option<pprof::ProfilerGuard>) {
// Write out the flamegraph
if let Some(profiler_guard) = profiler_guard {
if let Ok(report) = profiler_guard.report().build() {
// this gets written under the workdir
let file = std::fs::File::create("flamegraph.svg").unwrap();
let mut options = pprof::flamegraph::Options::default();
options.image_width = Some(2500);
report.flamegraph_with_options(file, &mut options).unwrap();
}
}
}
}
/// Dummy implementation when compiling without profiling feature or for non-linux OSes.
#[cfg(not(feature = "profiling"))]
mod profiling_impl {
pub struct DummyProfilerGuard;
impl Drop for DummyProfilerGuard {
fn drop(&mut self) {
// do nothing, this exists to calm Clippy down
}
}
pub fn profpoint_start() -> Option<DummyProfilerGuard> {
None
}
pub fn init_profiler() -> Option<DummyProfilerGuard> {
None
}
pub fn exit_profiler(profiler_guard: &Option<DummyProfilerGuard>) {}
}

View File

@@ -121,6 +121,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
// .worker_threads(1)
.thread_name("walreceiver worker")
.enable_all()
.build()

View File

@@ -2868,7 +2868,7 @@ impl Tenant {
/// - after initialization complete, remove the temp dir.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
pub async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,

View File

@@ -233,8 +233,10 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::{BlockReaderRef, FileBlockReader}};
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
@@ -243,6 +245,7 @@ mod tests {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
// Write part (in block to drop the file)
let now = Instant::now();
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path()).await?;
@@ -255,12 +258,19 @@ mod tests {
// read again with read_blk
let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
println!("Writing final blob at offs={offs}");
println!("wrote {}", wtr.size());
wtr.flush_buffer().await?;
}
println!("write buffered={} time: {:?}", BUFFERED, now.elapsed());
let now = Instant::now();
let file = VirtualFile::open(pathbuf.as_path()).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
// let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = FileBlockReader::new(file);
let rdr = BlockReaderRef::FileBlockReader(&rdr);
let rdr = BlockCursor::new(rdr);
let prof_guard = crate::profiling::profpoint_start();
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
@@ -268,6 +278,9 @@ mod tests {
"mismatch for idx={idx} at offset={offset}"
);
}
drop(prof_guard);
println!("read buffered={} time: {:?}", BUFFERED, now.elapsed());
Ok(())
}
@@ -301,7 +314,7 @@ mod tests {
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
random_array(10_000 * PAGE_SZ), // 80MB
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
@@ -321,19 +334,29 @@ mod tests {
#[tokio::test]
async fn test_arrays_random_size() -> Result<(), Error> {
let profiler_guard = crate::profiling::init_profiler();
// crate::page_cache::init(10000);
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
let blobs = (0..1024*1024)
.map(|_| {
let mut sz: u16 = rng.gen();
// Make 50% of the arrays small
if rng.gen() {
sz |= 63;
if true || rng.gen() {
sz &= 63; // TODO why or? should be and?
}
random_array(sz.into())
})
.collect::<Vec<_>>();
let total_len: usize = blobs.iter().map(|b| b.len()).sum();
println!("total_len {}", total_len);
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
crate::profiling::exit_profiler(&profiler_guard);
Ok(())
}

View File

@@ -244,6 +244,9 @@ pub(super) async fn handle_walreceiver_connection(
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
// tokio::task::block_in_place(move || {
// tokio::runtime::Handle::current().block_on(async move {
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
@@ -293,6 +296,8 @@ pub(super) async fn handle_walreceiver_connection(
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
let prof_guard = crate::profiling::profpoint_start();
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
@@ -330,6 +335,7 @@ pub(super) async fn handle_walreceiver_connection(
caught_up = true;
}
drop(prof_guard);
Some(endlsn)
}
@@ -418,6 +424,11 @@ pub(super) async fn handle_walreceiver_connection(
}
}
// Ok(())
// })?;
// Ok::<(), WalReceiverError>(())
// })?;
Ok(())
}

Binary file not shown.