diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index fe2204b539..c32e636626 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,4 +1,4 @@ -use std::fs::{self, OpenOptions}; +use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; use std::net::SocketAddr; use std::net::TcpStream; @@ -399,6 +399,14 @@ impl PostgresNode { String::from_utf8(output.stdout).unwrap().trim().to_string() } + fn dump_log_file(&self) { + if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- Dump pageserver.log:\n{}", buffer); + } + } + pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { let connstring = format!( "host={} port={} dbname={} user={}", @@ -410,7 +418,11 @@ impl PostgresNode { let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); println!("Running {}", sql); - client.query(sql, &[]).unwrap() + let result = client.query(sql, &[]); + if result.is_err() { + self.dump_log_file(); + } + result.unwrap() } pub fn open_psql(&self, db: &str) -> Client { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 0a9eca57b3..8c16a1a926 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -172,7 +172,7 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB opts.create_if_missing(true); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.create_missing_column_families(true); + opts.create_missing_column_families(true); rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap() } @@ -429,7 +429,7 @@ impl PageCache { // reconstruct most recent page version if content.wal_record.is_some() { - trace!("Reconstruct most recent page {:?}", key); + trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version self.reconstruct_page(key, content)?; } @@ -451,7 +451,7 @@ impl PageCache { minbuf.clear(); minbuf.extend_from_slice(&k); let key = CacheKey::unpack(&mut minbuf); - trace!("Reconstruct horizon page {:?}", key); + trace!("Reconstruct horizon page {:?}", key); self.reconstruct_page(key, content)?; } } @@ -459,13 +459,13 @@ impl PageCache { // remove records prior to horizon minbuf.clear(); minkey.pack(&mut minbuf); - trace!("Delete records in range {:?}..{:?}", minkey, maxkey); + trace!("Delete records in range {:?}..{:?}", minkey, maxkey); self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; maxkey = minkey; } else { - break; - } + break; + } } } } @@ -497,35 +497,36 @@ impl PageCache { } async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { - let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); - if walreceiver_works { - self.seqwait_lsn - .wait_for_timeout(lsn, TIMEOUT) - .await - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ) - })?; - } else { - // There is a a race at postgres instance start - // when we request a page before walsender established connection - // and was able to stream the page. Just don't wait and return what we have. - // TODO is there any corner case when this is incorrect? - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), - lsn - ); + loop { + let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); + if walreceiver_works { + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; + break; + } else { + // There is a a race at postgres instance start + // when we request a page before walsender established connection + // and was able to stream the page. Just don't wait and return what we have. + // TODO is there any corner case when this is incorrect? + info!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } } let shared = self.shared.lock().unwrap(); - - if walreceiver_works { - assert!(lsn <= shared.last_valid_lsn); - } + assert!(lsn <= shared.last_valid_lsn); Ok(()) } @@ -857,11 +858,13 @@ impl PageCache { } } let relsize = tag.blknum + 1; + trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize); return Ok(relsize); } } break; } + trace!("Size of relation {:?} at {} is zero", rel, lsn); Ok(0) } @@ -886,9 +889,11 @@ impl PageCache { buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); if tag.rel == *rel { + trace!("Relation {:?} exists at {}", rel, lsn); return Ok(true); } } + trace!("Relation {:?} doesn't exist at {}", rel, lsn); Ok(false) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ba517b2138..a70fbb9c02 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -15,6 +15,7 @@ use crate::ZTimelineId; use anyhow::Error; use lazy_static::lazy_static; use log::*; +use postgres_ffi::xlog_utils::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; @@ -30,7 +31,6 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; -use postgres_ffi::xlog_utils::*; // // We keep one WAL Receiver active per timeline.