Merge pull request #62 from zenithdb/dump_log_files

Wait WAL receiver to start
This commit is contained in:
Konstantin Knizhnik
2021-04-23 12:45:59 +03:00
committed by GitHub
3 changed files with 52 additions and 35 deletions

View File

@@ -1,4 +1,4 @@
use std::fs::{self, OpenOptions};
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::TcpStream;
@@ -399,6 +399,14 @@ impl PostgresNode {
String::from_utf8(output.stdout).unwrap().trim().to_string()
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- Dump pageserver.log:\n{}", buffer);
}
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
@@ -410,7 +418,11 @@ impl PostgresNode {
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
client.query(sql, &[]).unwrap()
let result = client.query(sql, &[]);
if result.is_err() {
self.dump_log_file();
}
result.unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {

View File

@@ -172,7 +172,7 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
opts.create_if_missing(true);
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.create_missing_column_families(true);
opts.create_missing_column_families(true);
rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap()
}
@@ -429,7 +429,7 @@ impl PageCache {
// reconstruct most recent page version
if content.wal_record.is_some() {
trace!("Reconstruct most recent page {:?}", key);
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
}
@@ -451,7 +451,7 @@ impl PageCache {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
trace!("Reconstruct horizon page {:?}", key);
trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
}
}
@@ -459,13 +459,13 @@ impl PageCache {
// remove records prior to horizon
minbuf.clear();
minkey.pack(&mut minbuf);
trace!("Delete records in range {:?}..{:?}", minkey, maxkey);
trace!("Delete records in range {:?}..{:?}", minkey, maxkey);
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
maxkey = minkey;
} else {
break;
}
break;
}
}
}
}
@@ -497,35 +497,36 @@ impl PageCache {
}
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
if walreceiver_works {
self.seqwait_lsn
.wait_for_timeout(lsn, TIMEOUT)
.await
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
)
})?;
} else {
// There is a a race at postgres instance start
// when we request a page before walsender established connection
// and was able to stream the page. Just don't wait and return what we have.
// TODO is there any corner case when this is incorrect?
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
lsn
);
loop {
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
if walreceiver_works {
self.seqwait_lsn
.wait_for_timeout(lsn, TIMEOUT)
.await
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
)
})?;
break;
} else {
// There is a a race at postgres instance start
// when we request a page before walsender established connection
// and was able to stream the page. Just don't wait and return what we have.
// TODO is there any corner case when this is incorrect?
info!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
lsn
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
let shared = self.shared.lock().unwrap();
if walreceiver_works {
assert!(lsn <= shared.last_valid_lsn);
}
assert!(lsn <= shared.last_valid_lsn);
Ok(())
}
@@ -857,11 +858,13 @@ impl PageCache {
}
}
let relsize = tag.blknum + 1;
trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
trace!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
@@ -886,9 +889,11 @@ impl PageCache {
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
trace!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
}
trace!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}

View File

@@ -15,6 +15,7 @@ use crate::ZTimelineId;
use anyhow::Error;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::xlog_utils::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::collections::HashMap;
@@ -30,7 +31,6 @@ use tokio::time::{sleep, Duration};
use tokio_postgres::replication::{PgTimestamp, ReplicationStream};
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
use postgres_ffi::xlog_utils::*;
//
// We keep one WAL Receiver active per timeline.