diff --git a/Cargo.lock b/Cargo.lock index e0f5b06f3b..947d31bfa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,17 @@ dependencies = [ "wildmatch", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -727,7 +738,7 @@ dependencies = [ "httpdate", "itoa", "pin-project", - "socket2", + "socket2 0.3.19", "tokio", "tower-service", "tracing", @@ -903,7 +914,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" dependencies = [ - "socket2", + "socket2 0.3.19", "winapi", ] @@ -927,12 +938,12 @@ dependencies = [ [[package]] name = "nb-connect" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670361df1bc2399ee1ff50406a0d422587dd3bb0da596e1978fe8e05dabddf4f" +checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d" dependencies = [ "libc", - "socket2", + "socket2 0.4.0", ] [[package]] @@ -1036,11 +1047,13 @@ dependencies = [ "bytes", "futures", "lazy_static", + "log", "postgres", "postgres-protocol", "rand 0.8.3", "regex", "rust-s3", + "stderrlog", "tokio", "tokio-postgres", "tokio-stream", @@ -1141,11 +1154,11 @@ checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" [[package]] name = "polling" -version = "2.0.2" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4" +checksum = "4fc12d774e799ee9ebae13f4076ca003b40d18a11ac0f3641e6f899618580b7b" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", "log", "wepoll-sys", @@ -1576,9 +1589,9 @@ checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" [[package]] name = "siphasher" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8f3741c7372e75519bd9346068370c9cdaabcc1f9599cbcf2a2719352286b7" +checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27" [[package]] name = "slab" @@ -1603,6 +1616,29 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "stderrlog" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a53e2eff3e94a019afa6265e8ee04cb05b9d33fe9f5078b14e4e391d155a38" +dependencies = [ + "atty", + "chrono", + "log", + "termcolor", + "thread_local", +] + [[package]] name = "stringprep" version = "0.1.2" @@ -1644,6 +1680,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.24" @@ -1664,6 +1709,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "time" version = "0.1.43" @@ -1691,9 +1745,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda" +checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" dependencies = [ "autocfg", "bytes", @@ -1747,16 +1801,16 @@ dependencies = [ "pin-project", "postgres-protocol", "postgres-types", - "socket2", + "socket2 0.3.19", "tokio", "tokio-util", ] [[package]] name = "tokio-stream" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469" +checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" dependencies = [ "futures-core", "pin-project-lite", @@ -1765,9 +1819,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29" +checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f" dependencies = [ "bytes", "futures-core", @@ -1868,9 +1922,9 @@ checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb" [[package]] name = "vec-arena" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" +checksum = "34b2f665b594b07095e3ac3f718e13c2197143416fae4c5706cffb7b1af8d7f1" [[package]] name = "version_check" @@ -2015,6 +2069,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 718c002848..ea4024534d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ bytes = "1.0.1" byteorder = "1.4.3" futures = "0.3.13" lazy_static = "1.4.0" +log = "0.4.14" +stderrlog = "0.5.1" rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index 1838e41991..8e26ade010 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -3,6 +3,7 @@ // use std::thread; +use log::*; use pageserver::page_service; use pageserver::restore_s3; @@ -12,6 +13,12 @@ use std::io::Error; fn main() -> Result<(), Error> { + stderrlog::new() + .verbosity(3) + .module("pageserver") + .init().unwrap(); + info!("starting..."); + // First, restore the latest base backup from S3. (We don't persist anything // to local disk at the moment, so we need to do this at every startup) restore_s3::restore_main(); diff --git a/src/page_cache.rs b/src/page_cache.rs index 2c5ff85f17..6db4defd91 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -10,6 +10,7 @@ use std::sync::Mutex; use bytes::Bytes; use lazy_static::lazy_static; use rand::Rng; +use log::*; use crate::walredo; @@ -150,7 +151,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result records.push(rec.clone()); if rec.will_init { - println!("WAL record at LSN {} initializes the page", rec.lsn); + debug!("WAL record at LSN {} initializes the page", rec.lsn); } } } @@ -163,7 +164,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result page_img = walredo::apply_wal_records(tag, base_img, &records)?; - println!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn); + debug!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn); // Here, we could put the new page image back to the page cache, to save effort if the // same (or later) page version is requested again. It's a tradeoff though, as each @@ -227,8 +228,8 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes) let oldentry = pagecache.insert(key, entry); assert!(oldentry.is_none()); - //println!("inserted page image for {}/{}/{}_{} blk {} at {}", - // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); + debug!("inserted page image for {}/{}/{}_{} blk {} at {}", + tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); } // @@ -297,7 +298,7 @@ pub fn _test_get_page_at_lsn() let pagecache = &shared.pagecache; if pagecache.is_empty() { - println!("page cache is empty"); + info!("page cache is empty"); return; } @@ -313,18 +314,22 @@ pub fn _test_get_page_at_lsn() } } - println!("testing GetPage@LSN for block {}", tag.unwrap().blknum); + info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { Ok(_img) => { // This prints out the whole page image. //println!("{:X?}", img); }, Err(error) => { - println!("GetPage@LSN failed: {}", error); + error!("GetPage@LSN failed: {}", error); } } } + +// FIXME: Shouldn't relation size also be tracked with an LSN? +// If a replica is lagging behind, it needs to get the size as it was on +// the replica's current replay LSN. pub fn relsize_inc(rel: &RelTag, to: Option) { let mut shared = PAGECACHE.lock().unwrap(); diff --git a/src/page_service.rs b/src/page_service.rs index 8c57bacfef..2647dadbcd 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -16,6 +16,7 @@ use tokio::task; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, Bytes, BytesMut}; use std::io; +use log::*; use crate::page_cache; @@ -210,7 +211,7 @@ pub fn thread_main() { let runtime = runtime::Runtime::new().unwrap(); let listen_address = "127.0.0.1:5430"; - println!("Starting page server on {}", listen_address); + info!("Starting page server on {}", listen_address); runtime.block_on(async { let _unused = page_service_main(listen_address).await; @@ -228,7 +229,7 @@ async fn page_service_main(listen_address: &str) { task::spawn(async move { if let Err(err) = conn_handler.run().await { - println!("error: {}", err); + error!("error: {}", err); } }); } @@ -373,7 +374,7 @@ impl Connection { match self.read_message().await? { Some(FeMessage::StartupMessage(m)) => { - println!("got message {:?}", m); + trace!("got message {:?}", m); match m.kind { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { @@ -396,7 +397,7 @@ impl Connection { break; } None => { - println!("connection closed"); + info!("connection closed"); break; } _ => { @@ -409,7 +410,7 @@ impl Connection { } async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> { - println!("got query {:?}", q.body); + trace!("got query {:?}", q.body); if q.body.starts_with(b"pagestream") { self.handle_pagerequests().await diff --git a/src/restore_s3.rs b/src/restore_s3.rs index 48e05a96ce..53afcfd1f7 100644 --- a/src/restore_s3.rs +++ b/src/restore_s3.rs @@ -11,6 +11,7 @@ use std::env; use std::fmt; use regex::Regex; use bytes::{BytesMut, Buf}; +use log::*; use s3::bucket::Bucket; use s3::creds::Credentials; @@ -39,7 +40,7 @@ pub fn restore_main() { match result { Ok(_) => { return; }, Err(err) => { - println!("S3 error: {}", err); + error!("S3 error: {}", err); return; } } @@ -70,7 +71,7 @@ async fn restore_chunk() -> Result<(), S3Error> { bucket: "zenith-testbucket".to_string() }; - println!("Restoring from S3..."); + info!("Restoring from S3..."); // Create Bucket in REGION for BUCKET let bucket = Bucket::new_with_path_style(&backend.bucket, backend.region, backend.credentials)?; @@ -101,7 +102,7 @@ async fn restore_chunk() -> Result<(), S3Error> { slurp_futures.push(f); } - Err(e) => { println!("unrecognized file: {} ({})", relpath, e); } + Err(e) => { warn!("unrecognized file: {} ({})", relpath, e); } }; } } @@ -111,10 +112,10 @@ async fn restore_chunk() -> Result<(), S3Error> { } page_cache::init_valid_lsn(oldest_lsn); - println!("{} files to read...", slurp_futures.len()); + info!("{} files to read...", slurp_futures.len()); future::join_all(slurp_futures).await; - println!("restored!"); + info!("restored!"); Ok(()) } @@ -271,7 +272,7 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage // the reqwest::Client object. But that requires changes to rust-s3 itself. let (data, code) = bucket.get_object(s3path.clone()).await.unwrap(); - //println!("got response: {} on {}", code, &s3path); + trace!("got response: {} on {}", code, &s3path); assert_eq!(200, code); let mut bytes = BytesMut::from(data.as_slice()).freeze(); diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 84a9ce94a8..e41413e0c0 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -5,6 +5,7 @@ // For each WAL record, it decodes the record to figure out which data blocks // the record affects, and adds the records to the page cache. // +use log::*; use tokio_stream::StreamExt; use tokio::runtime; @@ -22,7 +23,7 @@ use postgres_protocol::message::backend::ReplicationMessage; // pub fn thread_main() { - println!("Starting WAL receiver"); + info!("Starting WAL receiver"); let runtime = runtime::Builder::new_current_thread() .enable_all() @@ -34,7 +35,7 @@ pub fn thread_main() { let _res = walreceiver_main().await; // TODO: print/log the error - println!("WAL streaming connection failed, retrying in 5 seconds..."); + info!("WAL streaming connection failed, retrying in 5 seconds..."); sleep(Duration::from_secs(5)).await; } }); @@ -43,17 +44,17 @@ pub fn thread_main() { async fn walreceiver_main() -> Result<(), Error> { // Connect to the database in replication mode. - println!("connecting..."); + debug!("connecting..."); let (mut rclient, connection) = connect_replication("host=localhost user=zenith port=65432", NoTls, ReplicationMode::Physical).await?; - println!("connected!"); - + debug!("connected!"); + // The connection object performs the actual communication with the database, // so spawn it off to run on its own. tokio::spawn(async move { if let Err(e) = connection.await { - eprintln!("connection error: {}", e); + error!("connection error: {}", e); } }); @@ -75,7 +76,7 @@ async fn walreceiver_main() -> Result<(), Error> { match replication_message? { ReplicationMessage::XLogData(xlog_data) => { - println!("received XLogData"); + trace!("received XLogData"); // Pass the WAL data to the decoder, and see if we can decode // more records as a result. @@ -85,7 +86,6 @@ async fn walreceiver_main() -> Result<(), Error> { if let Some((lsn, recdata)) = waldecoder.poll_decode() { let decoded = crate::waldecoder::decode_wal_record(lsn, recdata.clone()); - println!("decoded record"); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifes. (The actual WAL record is kept in @@ -119,7 +119,8 @@ async fn walreceiver_main() -> Result<(), Error> { } } ReplicationMessage::PrimaryKeepAlive(_keepalive) => { - println!("received PrimaryKeepAlive"); + trace!("received PrimaryKeepAlive"); + // FIXME: Reply, or the connection will time out } _ => (), }