From ee87e6aad3f00ababea705a6c150e4825e7548f3 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 22 Apr 2021 22:14:41 +0300 Subject: [PATCH 1/5] Sum log files in case of test failure --- control_plane/src/compute.rs | 20 ++++++++++++++++++-- pageserver/src/page_cache.rs | 16 ++++++++++------ pageserver/src/walreceiver.rs | 2 +- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index fe2204b539..6af7427294 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,4 +1,4 @@ -use std::fs::{self, OpenOptions}; +use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; use std::net::SocketAddr; use std::net::TcpStream; @@ -399,6 +399,18 @@ impl PostgresNode { String::from_utf8(output.stdout).unwrap().trim().to_string() } + fn dump_log_files(&self) { + let dir = zenith_repo_dir(); + for entry in fs::read_dir(dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + let mut file = File::open(&path).unwrap(); + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("File {:?}:\n{}", &path, buffer); + } + } + pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { let connstring = format!( "host={} port={} dbname={} user={}", @@ -410,7 +422,11 @@ impl PostgresNode { let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); println!("Running {}", sql); - client.query(sql, &[]).unwrap() + let result = client.query(sql, &[]); + if result.is_err() { + self.dump_log_files(); + } + result.unwrap() } pub fn open_psql(&self, db: &str) -> Client { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 0a9eca57b3..54d4ca63fd 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -172,7 +172,7 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB opts.create_if_missing(true); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.create_missing_column_families(true); + opts.create_missing_column_families(true); rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap() } @@ -429,7 +429,7 @@ impl PageCache { // reconstruct most recent page version if content.wal_record.is_some() { - trace!("Reconstruct most recent page {:?}", key); + trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version self.reconstruct_page(key, content)?; } @@ -451,7 +451,7 @@ impl PageCache { minbuf.clear(); minbuf.extend_from_slice(&k); let key = CacheKey::unpack(&mut minbuf); - trace!("Reconstruct horizon page {:?}", key); + trace!("Reconstruct horizon page {:?}", key); self.reconstruct_page(key, content)?; } } @@ -459,13 +459,13 @@ impl PageCache { // remove records prior to horizon minbuf.clear(); minkey.pack(&mut minbuf); - trace!("Delete records in range {:?}..{:?}", minkey, maxkey); + trace!("Delete records in range {:?}..{:?}", minkey, maxkey); self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; maxkey = minkey; } else { - break; - } + break; + } } } } @@ -857,11 +857,13 @@ impl PageCache { } } let relsize = tag.blknum + 1; + info!("Size of relation {:?} at {} is {}", rel, lsn, relsize); return Ok(relsize); } } break; } + info!("Size of relation {:?} at {} is zero", rel, lsn); Ok(0) } @@ -886,9 +888,11 @@ impl PageCache { buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); if tag.rel == *rel { + info!("Relation {:?} exists at {}", rel, lsn); return Ok(true); } } + info!("Relation {:?} doesn't exist at {}", rel, lsn); Ok(false) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ba517b2138..a70fbb9c02 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -15,6 +15,7 @@ use crate::ZTimelineId; use anyhow::Error; use lazy_static::lazy_static; use log::*; +use postgres_ffi::xlog_utils::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; @@ -30,7 +31,6 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; -use postgres_ffi::xlog_utils::*; // // We keep one WAL Receiver active per timeline. From 5f277755b16cf8646d072c0b8b22ccfb15d16d84 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 22 Apr 2021 22:27:12 +0300 Subject: [PATCH 2/5] Sum log files in case of test failure --- control_plane/src/compute.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 6af7427294..1f510b1582 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -404,10 +404,13 @@ impl PostgresNode { for entry in fs::read_dir(dir).unwrap() { let entry = entry.unwrap(); let path = entry.path(); - let mut file = File::open(&path).unwrap(); - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("File {:?}:\n{}", &path, buffer); + if path.is_dir() { + if let Ok(mut file) = File::open(path.join("pageserver.log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("File {:?}:\n{}", &path, buffer); + } + } } } From db5712f28b3eb154f52e71d8d25cc482fee20b12 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 23 Apr 2021 09:41:08 +0300 Subject: [PATCH 3/5] Dump pageserver.log in case of test errors --- control_plane/src/compute.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 1f510b1582..1fb64e1a72 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -399,19 +399,12 @@ impl PostgresNode { String::from_utf8(output.stdout).unwrap().trim().to_string() } - fn dump_log_files(&self) { - let dir = zenith_repo_dir(); - for entry in fs::read_dir(dir).unwrap() { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_dir() { - if let Ok(mut file) = File::open(path.join("pageserver.log")) { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("File {:?}:\n{}", &path, buffer); - } - } - } + fn dump_log_file(&self) { + if let Ok(mut file) = File::open(zenith_repo_dir().join("pageserver.log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- Dump pageserver.log:\n{}", buffer); + } } pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { @@ -427,7 +420,7 @@ impl PostgresNode { println!("Running {}", sql); let result = client.query(sql, &[]); if result.is_err() { - self.dump_log_files(); + self.dump_log_file(); } result.unwrap() } From 0eaff5aa7f46d91b38cb4370e36e5699a6687379 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 23 Apr 2021 11:37:28 +0300 Subject: [PATCH 4/5] Fix pageserver.log path --- control_plane/src/compute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 1fb64e1a72..c3f04aeb41 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -400,7 +400,7 @@ impl PostgresNode { } fn dump_log_file(&self) { - if let Ok(mut file) = File::open(zenith_repo_dir().join("pageserver.log")) { + if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) { let mut buffer = String::new(); file.read_to_string(&mut buffer).unwrap(); println!("--------------- Dump pageserver.log:\n{}", buffer); From 59b23fef64a66d3e40422f0a040331f889410b82 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 23 Apr 2021 12:40:29 +0300 Subject: [PATCH 5/5] Wait for WAL receiver to start --- control_plane/src/compute.rs | 10 +++--- pageserver/src/page_cache.rs | 61 ++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index c3f04aeb41..c32e636626 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -400,11 +400,11 @@ impl PostgresNode { } fn dump_log_file(&self) { - if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- Dump pageserver.log:\n{}", buffer); - } + if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- Dump pageserver.log:\n{}", buffer); + } } pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 54d4ca63fd..8c16a1a926 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -497,35 +497,36 @@ impl PageCache { } async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { - let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); - if walreceiver_works { - self.seqwait_lsn - .wait_for_timeout(lsn, TIMEOUT) - .await - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ) - })?; - } else { - // There is a a race at postgres instance start - // when we request a page before walsender established connection - // and was able to stream the page. Just don't wait and return what we have. - // TODO is there any corner case when this is incorrect? - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), - lsn - ); + loop { + let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); + if walreceiver_works { + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; + break; + } else { + // There is a a race at postgres instance start + // when we request a page before walsender established connection + // and was able to stream the page. Just don't wait and return what we have. + // TODO is there any corner case when this is incorrect? + info!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } } let shared = self.shared.lock().unwrap(); - - if walreceiver_works { - assert!(lsn <= shared.last_valid_lsn); - } + assert!(lsn <= shared.last_valid_lsn); Ok(()) } @@ -857,13 +858,13 @@ impl PageCache { } } let relsize = tag.blknum + 1; - info!("Size of relation {:?} at {} is {}", rel, lsn, relsize); + trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize); return Ok(relsize); } } break; } - info!("Size of relation {:?} at {} is zero", rel, lsn); + trace!("Size of relation {:?} at {} is zero", rel, lsn); Ok(0) } @@ -888,11 +889,11 @@ impl PageCache { buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); if tag.rel == *rel { - info!("Relation {:?} exists at {}", rel, lsn); + trace!("Relation {:?} exists at {}", rel, lsn); return Ok(true); } } - info!("Relation {:?} doesn't exist at {}", rel, lsn); + trace!("Relation {:?} doesn't exist at {}", rel, lsn); Ok(false) }