From 250ae643a8d8a967cb491e918d07d598bb0c6cdc Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 25 Aug 2021 18:05:58 +0300 Subject: [PATCH] Remove 'zenith push' feature. Now that the new storage format is based on immutable files, we want to implement push/pull in terms of these immutable files as well. Similarly to how those files will be transferred between S3 and the page server. The implementation we had was fairly tightly coupled with the object repository implementation, but I'm about to remove the object / rocksdb storage format soon. That would leave the current "zenith push" command completely broken. It seemed like a good idea at the time, but in hindsight, it was premature to implement push/pull yet. It's a nice feature and I'd like to see it reimplemented in the future, but in the meanwhile, let's remove the code we had. We can dig the parts of it that might be useful in the future from the git history. --- pageserver/src/layered_repository.rs | 19 +--- pageserver/src/object_repository.rs | 36 ------- pageserver/src/page_service.rs | 78 +-------------- pageserver/src/repository.rs | 139 --------------------------- zenith/src/main.rs | 30 ------ 5 files changed, 2 insertions(+), 300 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ff1a2a247d..1dc5cec82f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -28,7 +28,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use crate::relish::*; -use crate::repository::{GcResult, History, Repository, Timeline, WALRecord}; +use crate::repository::{GcResult, Repository, Timeline, WALRecord}; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; use crate::PageServerConf; @@ -651,11 +651,6 @@ impl Timeline for LayeredTimeline { Ok(all_rels) } - fn history<'a>(&'a self) -> Result> { - // This is needed by the push/pull functionality. Not implemented yet. - todo!(); - } - fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { if !rel.is_blocky() && blknum != 0 { bail!( @@ -780,18 +775,6 @@ impl Timeline for LayeredTimeline { layer.put_page_image(blknum, lsn, img) } - fn put_raw_data( - &self, - _tag: crate::object_key::ObjectTag, - _lsn: Lsn, - _data: &[u8], - ) -> Result<()> { - // FIXME: This doesn't make much sense for the layered storage format, - // it's pretty tightly coupled with the way the object store stores - // things. - bail!("put_raw_data not implemented"); - } - /// Public entry point for checkpoint(). All the logic is in the private /// checkpoint_internal function, this public facade just wraps it for /// metrics collection. diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index b5bcdd490d..46e619237a 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -523,15 +523,6 @@ impl Timeline for ObjectTimeline { Ok(()) } - fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> { - let key = ObjectKey { - timeline: self.timelineid, - tag, - }; - self.obj_store.put(&key, lsn, data)?; - Ok(()) - } - /// /// Memorize a full image of a page version /// @@ -706,12 +697,6 @@ impl Timeline for ObjectTimeline { Ok(()) } - - fn history<'a>(&'a self) -> Result> { - let lsn = self.last_valid_lsn.load(); - let iter = self.obj_store.objects(self.timelineid, lsn)?; - Ok(Box::new(ObjectHistory { lsn, iter })) - } } impl ObjectTimeline { @@ -1049,27 +1034,6 @@ impl ObjectTimeline { } } -struct ObjectHistory<'a> { - iter: Box)>> + 'a>, - lsn: Lsn, -} - -impl<'a> Iterator for ObjectHistory<'a> { - type Item = Result; - - fn next(&mut self) -> Option { - self.iter - .next() - .map(|result| result.map(|t| Modification::new(t))) - } -} - -impl<'a> History for ObjectHistory<'a> { - fn lsn(&self) -> Lsn { - self.lsn - } -} - /// /// We store several kinds of objects in the repository. /// We have per-page, per-relation and per-timeline entries. diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2788e81c21..c75c96a5de 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -15,7 +15,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use regex::Regex; -use std::io::Write; use std::net::TcpListener; use std::str; use std::str::FromStr; @@ -31,13 +30,12 @@ use zenith_utils::pq_proto::{ BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC, }; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; +use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::branches; use crate::page_cache; use crate::relish::*; -use crate::repository::Modification; use crate::walreceiver; use crate::PageServerConf; @@ -526,80 +524,6 @@ impl postgres_backend::Handler for PageServerHandler { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::DataRow(&[Some(&branch)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("push ") { - // push - let re = Regex::new(r"^push ([[:xdigit:]]+) ([[:xdigit:]]+)$").unwrap(); - - let caps = re - .captures(query_string) - .ok_or_else(|| anyhow!("invalid push: '{}'", query_string))?; - - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - - self.check_permission(Some(tenantid))?; - - let start_lsn = Lsn(0); // TODO this needs to come from the repo - let timeline = page_cache::get_repository_for_tenant(&tenantid)? - .create_empty_timeline(timelineid, start_lsn)?; - - pgb.write_message(&BeMessage::CopyInResponse)?; - - let mut last_lsn = Lsn(0); - - while let Some(msg) = pgb.read_message()? { - match msg { - FeMessage::CopyData(bytes) => { - let modification = Modification::des(&bytes)?; - - last_lsn = modification.lsn; - timeline.put_raw_data( - modification.tag, - modification.lsn, - &modification.data, - )?; - } - FeMessage::CopyDone => { - timeline.advance_last_valid_lsn(last_lsn); - break; - } - FeMessage::Sync => {} - _ => bail!("unexpected message {:?}", msg), - } - } - - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("request_push ") { - // request_push - let re = Regex::new(r"^request_push ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); - - let caps = re - .captures(query_string) - .ok_or_else(|| anyhow!("invalid request_push: '{}'", query_string))?; - - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let postgres_connection_uri = caps.get(3).unwrap().as_str(); - - self.check_permission(Some(tenantid))?; - - let timeline = - page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; - - let mut conn = postgres::Client::connect(postgres_connection_uri, postgres::NoTls)?; - let mut copy_in = conn.copy_in(format!("push {}", timelineid.to_string()).as_str())?; - - let history = timeline.history()?; - for update_res in history { - let update = update_res?; - let update_bytes = update.ser()?; - copy_in.write_all(&update_bytes)?; - copy_in.flush()?; // ensure that messages are sent inside individual CopyData packets - } - - copy_in.finish()?; - - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("branch_list ") { // branch_list let re = Regex::new(r"^branch_list ([[:xdigit:]]+)$").unwrap(); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 81ee5029cd..d005615bef 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,10 +1,8 @@ -use crate::object_key::*; use crate::relish::*; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::iter::Iterator; use std::ops::AddAssign; use std::sync::Arc; use std::time::Duration; @@ -166,9 +164,6 @@ pub trait Timeline: Send + Sync { /// This method is used for marking dropped relations and truncated SLRU segments fn put_unlink(&self, tag: RelishTag, lsn: Lsn) -> Result<()>; - /// Put raw data - fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>; - /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that @@ -195,40 +190,6 @@ pub trait Timeline: Send + Sync { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint(&self) -> Result<()>; - - /// Events for all relations in the timeline. - /// Contains updates from start up to the last valid LSN - /// at time of history() call. This lsn can be read via the lsn() function. - /// - /// Relation size is increased implicitly and decreased with Truncate updates. - // TODO ordering guarantee? - fn history<'a>(&'a self) -> Result>; -} - -pub trait History: Iterator> { - /// The last_valid_lsn at the time of history() call. - fn lsn(&self) -> Lsn; -} - -// -// Structure representing any update operation of object storage. -// It is used to copy object storage content in PUSH method. -// -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct Modification { - pub tag: ObjectTag, - pub lsn: Lsn, - pub data: Vec, -} - -impl Modification { - pub fn new(entry: (ObjectTag, Lsn, Vec)) -> Modification { - Modification { - tag: entry.0, - lsn: entry.1, - data: entry.2, - } - } } #[derive(Clone)] @@ -281,7 +242,6 @@ mod tests { use super::*; use crate::layered_repository::LayeredRepository; use crate::object_repository::ObjectRepository; - use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::{PageServerConf, RepositoryFormat}; @@ -290,7 +250,6 @@ mod tests { use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; - use zenith_utils::bin_ser::BeSer; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; @@ -539,18 +498,6 @@ mod tests { Ok(()) } - fn skip_nonrel_objects<'a>( - snapshot: Box, - ) -> Result::Item> + 'a> { - Ok(snapshot.skip_while(|r| match r { - Ok(m) => match m.tag { - ObjectTag::RelationMetadata(_) => false, - _ => true, - }, - _ => panic!("Iteration error"), - })) - } - #[test] fn test_branch_rocksdb() -> Result<()> { let repo = get_test_repo("test_branch_rocksdb", RepositoryFormat::RocksDb)?; @@ -614,92 +561,6 @@ mod tests { Ok(()) } - #[test] - fn test_history_rocksdb() -> Result<()> { - let repo = get_test_repo("test_history_rocksdb", RepositoryFormat::RocksDb)?; - test_history(&*repo) - } - #[test] - // TODO: This doesn't work with the layered storage, the functions needed for push/pull - // functionality haven't been implemented yet. - #[ignore] - fn test_history_layered() -> Result<()> { - let repo = get_test_repo("test_history_layered", RepositoryFormat::Layered)?; - test_history(&*repo) - } - fn test_history(repo: &dyn Repository) -> Result<()> { - let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; - - let snapshot = tline.history()?; - assert_eq!(snapshot.lsn(), Lsn(0)); - let mut snapshot = skip_nonrel_objects(snapshot)?; - assert_eq!(None, snapshot.next().transpose()?); - - // add a page and advance the last valid LSN - let rel = TESTREL_A; - tline.put_page_image(rel, 1, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?; - tline.advance_last_valid_lsn(Lsn(1)); - - let expected_page = Modification { - tag: ObjectTag::Buffer(rel, 1), - lsn: Lsn(1), - data: ObjectValue::ser(&ObjectValue::Page(PageEntry::Page(TEST_IMG( - "blk 1 @ lsn 1", - ))))?, - }; - let expected_init_size = Modification { - tag: ObjectTag::RelationMetadata(rel), - lsn: Lsn(1), - data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(2)))?, - }; - let expected_trunc_size = Modification { - tag: ObjectTag::RelationMetadata(rel), - lsn: Lsn(2), - data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(0)))?, - }; - - let snapshot = tline.history()?; - assert_eq!(snapshot.lsn(), Lsn(1)); - let mut snapshot = skip_nonrel_objects(snapshot)?; - assert_eq!( - Some(&expected_init_size), - snapshot.next().transpose()?.as_ref() - ); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); - assert_eq!(None, snapshot.next().transpose()?); - - // truncate to zero, but don't advance the last valid LSN - tline.put_truncation(rel, Lsn(2), 0)?; - let snapshot = tline.history()?; - assert_eq!(snapshot.lsn(), Lsn(1)); - let mut snapshot = skip_nonrel_objects(snapshot)?; - assert_eq!( - Some(&expected_init_size), - snapshot.next().transpose()?.as_ref() - ); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); - assert_eq!(None, snapshot.next().transpose()?); - - // advance the last valid LSN and the truncation should be observable - tline.advance_last_valid_lsn(Lsn(2)); - let snapshot = tline.history()?; - assert_eq!(snapshot.lsn(), Lsn(2)); - let mut snapshot = skip_nonrel_objects(snapshot)?; - assert_eq!( - Some(&expected_init_size), - snapshot.next().transpose()?.as_ref() - ); - assert_eq!( - Some(&expected_trunc_size), - snapshot.next().transpose()?.as_ref() - ); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); - assert_eq!(None, snapshot.next().transpose()?); - - Ok(()) - } - // Mock WAL redo manager that doesn't do much struct TestRedoManager {} diff --git a/zenith/src/main.rs b/zenith/src/main.rs index a9ee712eec..f39e1f480a 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -131,12 +131,6 @@ fn main() -> Result<()> { ), ), ) - .subcommand( - SubCommand::with_name("push") - .about("Push timeline to remote pageserver") - .arg(Arg::with_name("timeline").required(true)) - .arg(Arg::with_name("remote").required(true)), - ) .get_matches(); // Create config file @@ -236,13 +230,6 @@ fn main() -> Result<()> { } } - ("push", Some(push_match)) => { - if let Err(e) = handle_push(push_match, &env) { - eprintln!("push operation failed: {}", e); - exit(1); - } - } - _ => {} }; @@ -541,20 +528,3 @@ fn handle_remote(remote_match: &ArgMatches, local_env: &LocalEnv) -> Result<()> Ok(()) } - -fn handle_push(push_match: &ArgMatches, local_env: &LocalEnv) -> Result<()> { - let timeline_id_str = push_match.value_of("timeline").unwrap(); - ZTimelineId::from_str(timeline_id_str)?; - - let remote_name = push_match.value_of("remote").unwrap(); - let remote = local_env - .remotes - .get(remote_name) - .ok_or_else(|| anyhow!("remote {} not found", remote_name))?; - - let page_server = PageServerNode::from_env(local_env); - let mut client = page_server.page_server_psql_client()?; - client.simple_query(&format!("request_push {} {}", timeline_id_str, remote))?; - - Ok(()) -}