Add 'walreceiver-after-ingest' failpoint. Use sleep at this point to imitate slow walreceiver.

This commit is contained in:
anastasia
2022-02-21 03:13:28 +03:00
committed by Anastasia Lubennikova
parent 993b544ad0
commit 1a4682a04a
4 changed files with 28 additions and 0 deletions

12
Cargo.lock generated
View File

@@ -552,6 +552,17 @@ dependencies = [
"termcolor",
]
[[package]]
name = "fail"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3245a0ca564e7f3c797d20d833a6870f57a728ac967d5225b3ffdef4465011"
dependencies = [
"lazy_static",
"log",
"rand",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@@ -1289,6 +1300,7 @@ dependencies = [
"crc32c",
"crossbeam-utils",
"daemonize",
"fail",
"futures",
"hex",
"hex-literal",

View File

@@ -41,6 +41,7 @@ url = "2"
nix = "0.23"
once_cell = "1.8.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] }
async-compression = {version = "0.3", features = ["zstd", "tokio"]}

View File

@@ -663,6 +663,17 @@ impl postgres_backend::Handler for PageServerHandler {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("failpoints ") {
let (_, failpoints) = query_string.split_at("failpoints ".len());
for failpoint in failpoints.split(';') {
if let Some((name, actions)) = failpoint.split_once('=') {
info!("cfg failpoint: {} {}", name, actions);
fail::cfg(name, actions).unwrap();
} else {
bail!("Invalid failpoints format");
}
}
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.

View File

@@ -12,6 +12,7 @@ use crate::thread_mgr::ThreadKind;
use crate::walingest::WalIngest;
use anyhow::{bail, Context, Error, Result};
use bytes::BytesMut;
use fail::fail_point;
use lazy_static::lazy_static;
use postgres_ffi::waldecoder::*;
use postgres_protocol::message::backend::ReplicationMessage;
@@ -31,6 +32,7 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::ZenithFeedback;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
//
// We keep one WAL Receiver active per timeline.
//
@@ -254,6 +256,8 @@ fn walreceiver_main(
let writer = timeline.writer();
walingest.ingest_record(writer.as_ref(), recdata, lsn)?;
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}