From 96e73fb58598db119134c3bdde976210c01db332 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 22 Jul 2021 10:59:28 +0300 Subject: [PATCH] Add in-memory storage engine-a --- pageserver/src/inmem_storage.rs | 345 +++++++++++++++++++ test_runner/batch_others/test_bulk_insert.py | 25 ++ test_runner/batch_others/test_seq_scan.py | 26 ++ 3 files changed, 396 insertions(+) create mode 100644 pageserver/src/inmem_storage.rs create mode 100644 test_runner/batch_others/test_bulk_insert.py create mode 100644 test_runner/batch_others/test_seq_scan.py diff --git a/pageserver/src/inmem_storage.rs b/pageserver/src/inmem_storage.rs new file mode 100644 index 0000000000..5fcc6f092d --- /dev/null +++ b/pageserver/src/inmem_storage.rs @@ -0,0 +1,345 @@ +//! +//! An implementation of the ObjectStore interface, backed by BTreeMap +//! +use crate::object_key::*; +use crate::object_store::ObjectStore; +use crate::repository::RelTag; +use crate::PageServerConf; +use crate::ZTimelineId; +use anyhow::{bail, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashSet}; +use std::fs::File; +use std::io::prelude::*; +use std::ops::Bound::*; +use std::sync::RwLock; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::Lsn; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] +pub struct StorageKey { + obj_key: ObjectKey, + lsn: Lsn, +} + +impl StorageKey { + /// The first key for a given timeline + fn timeline_start(timeline: ZTimelineId) -> Self { + Self { + obj_key: ObjectKey { + timeline, + tag: ObjectTag::FirstTag, + }, + lsn: Lsn(0), + } + } +} + +pub struct InmemObjectStore { + conf: &'static PageServerConf, + db: RwLock>>, +} + +impl ObjectStore for InmemObjectStore { + fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result> { + let db = self.db.read().unwrap(); + let val = db.get(&StorageKey { + obj_key: key.clone(), + lsn, + }); + if let Some(val) = val { + Ok(val.clone()) + } else { + bail!("could not find page {:?}", key); + } + } + + fn get_next_key(&self, key: &ObjectKey) -> Result> { + let search_key = StorageKey { + obj_key: key.clone(), + lsn: Lsn(0), + }; + let db = self.db.read().unwrap(); + for pair in db.range(&search_key..) { + let key = pair.0; + return Ok(Some(key.obj_key.clone())); + } + Ok(None) + } + + fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> { + let mut db = self.db.write().unwrap(); + db.insert( + StorageKey { + obj_key: key.clone(), + lsn, + }, + value.to_vec(), + ); + Ok(()) + } + + fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> { + let mut db = self.db.write().unwrap(); + db.remove(&StorageKey { + obj_key: key.clone(), + lsn, + }); + Ok(()) + } + + /// Iterate through page versions of given page, starting from the given LSN. + /// The versions are walked in descending LSN order. + fn object_versions<'a>( + &'a self, + key: &ObjectKey, + lsn: Lsn, + ) -> Result)> + 'a>> { + let from = StorageKey { + obj_key: key.clone(), + lsn: Lsn(0), + }; + let till = StorageKey { + obj_key: key.clone(), + lsn, + }; + let db = self.db.read().unwrap(); + let versions: Vec<(Lsn, Vec)> = db + .range(from..=till) + .map(|pair| (pair.0.lsn, pair.1.clone())) + .collect(); + Ok(Box::new(InmemObjectVersionIter::new(versions))) + } + + /// Iterate through all timeline objects + fn list_objects<'a>( + &'a self, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result + 'a>> { + let curr_key = StorageKey::timeline_start(timeline); + + Ok(Box::new(InmemObjectIter { + store: &self, + curr_key, + timeline, + nonrel_only, + lsn, + })) + } + + /// Get a list of all distinct relations in given tablespace and database. + /// + /// TODO: This implementation is very inefficient, it scans + /// through all entries in the given database. In practice, this + /// is used for CREATE DATABASE, and usually the template database is small. + /// But if it's not, this will be slow. + fn list_rels( + &self, + timelineid: ZTimelineId, + spcnode: u32, + dbnode: u32, + lsn: Lsn, + ) -> Result> { + // FIXME: This scans everything. Very slow + + let mut rels: HashSet = HashSet::new(); + + let mut search_rel_tag = RelTag { + spcnode, + dbnode, + relnode: 0, + forknum: 0u8, + }; + let db = self.db.read().unwrap(); + 'outer: loop { + let search_key = StorageKey { + obj_key: ObjectKey { + timeline: timelineid, + tag: ObjectTag::RelationMetadata(search_rel_tag), + }, + lsn: Lsn(0), + }; + for pair in db.range(&search_key..) { + let key = pair.0; + + if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag { + if spcnode != 0 && rel_tag.spcnode != spcnode + || dbnode != 0 && rel_tag.dbnode != dbnode + { + break 'outer; + } + if key.lsn <= lsn { + // visible in this snapshot + rels.insert(rel_tag); + } + search_rel_tag = rel_tag; + // skip to next relation + // FIXME: What if relnode is u32::MAX ? + search_rel_tag.relnode += 1; + continue 'outer; + } else { + // no more relation metadata entries + break 'outer; + } + } + } + + Ok(rels) + } + + /// Iterate through versions of all objects in a timeline. + /// + /// Returns objects in increasing key-version order. + /// Returns all versions up to and including the specified LSN. + fn objects<'a>( + &'a self, + timeline: ZTimelineId, + lsn: Lsn, + ) -> Result)>> + 'a>> { + let curr_key = StorageKey::timeline_start(timeline); + + Ok(Box::new(InmemObjects { + store: &self, + curr_key, + timeline, + lsn, + })) + } + + fn compact(&self) {} +} + +impl Drop for InmemObjectStore { + fn drop(&mut self) { + let path = self.conf.workdir.join("objstore.dmp"); + let mut f = File::create(path).unwrap(); + f.write(&self.db.ser().unwrap()).unwrap(); + } +} + +impl InmemObjectStore { + pub fn open(conf: &'static PageServerConf) -> Result { + let path = conf.workdir.join("objstore.dmp"); + let mut f = File::open(path)?; + let mut buffer = Vec::new(); + // read the whole file + f.read_to_end(&mut buffer)?; + let db = RwLock::new(BTreeMap::des(&buffer)?); + Ok(InmemObjectStore { conf: conf, db }) + } + + pub fn create(conf: &'static PageServerConf) -> Result { + Ok(InmemObjectStore { + conf: conf, + db: RwLock::new(BTreeMap::new()), + }) + } +} + +/// +/// Iterator for `object_versions`. Returns all page versions of a given block, in +/// reverse LSN order. +/// +struct InmemObjectVersionIter { + versions: Vec<(Lsn, Vec)>, + curr: usize, +} +impl InmemObjectVersionIter { + fn new(versions: Vec<(Lsn, Vec)>) -> InmemObjectVersionIter { + let curr = versions.len(); + InmemObjectVersionIter { versions, curr } + } +} +impl Iterator for InmemObjectVersionIter { + type Item = (Lsn, Vec); + + fn next(&mut self) -> std::option::Option { + if self.curr == 0 { + None + } else { + self.curr -= 1; + Some(self.versions[self.curr].clone()) + } + } +} + +struct InmemObjects<'r> { + store: &'r InmemObjectStore, + curr_key: StorageKey, + timeline: ZTimelineId, + lsn: Lsn, +} + +impl<'r> Iterator for InmemObjects<'r> { + // TODO consider returning Box<[u8]> + type Item = Result<(ObjectTag, Lsn, Vec)>; + + fn next(&mut self) -> Option { + self.next_result().transpose() + } +} + +impl<'r> InmemObjects<'r> { + fn next_result(&mut self) -> Result)>> { + let db = self.store.db.read().unwrap(); + for pair in db.range((Excluded(&self.curr_key), Unbounded)) { + let key = pair.0; + if key.obj_key.timeline != self.timeline { + return Ok(None); + } + if key.lsn > self.lsn { + // TODO can speed up by seeking iterator + continue; + } + self.curr_key = key.clone(); + let value = pair.1.clone(); + return Ok(Some((key.obj_key.tag, key.lsn, value))); + } + Ok(None) + } +} + +/// +/// Iterator for `list_objects`. Returns all objects preceeding specified LSN +/// +struct InmemObjectIter<'a> { + store: &'a InmemObjectStore, + curr_key: StorageKey, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, +} + +impl<'a> Iterator for InmemObjectIter<'a> { + type Item = ObjectTag; + + fn next(&mut self) -> std::option::Option { + let db = self.store.db.read().unwrap(); + 'outer: loop { + for pair in db.range((Excluded(&self.curr_key), Unbounded)) { + let key = pair.0; + if key.obj_key.timeline != self.timeline { + return None; + } + self.curr_key = key.clone(); + self.curr_key.lsn = Lsn(u64::MAX); // next seek should skip all versions + if key.lsn <= self.lsn { + // visible in this snapshot + if self.nonrel_only { + match key.obj_key.tag { + ObjectTag::RelationMetadata(_) => return None, + ObjectTag::RelationBuffer(_) => return None, + _ => return Some(key.obj_key.tag), + } + } else { + return Some(key.obj_key.tag); + } + } + continue 'outer; + } + return None; + } + } +} diff --git a/test_runner/batch_others/test_bulk_insert.py b/test_runner/batch_others/test_bulk_insert.py new file mode 100644 index 0000000000..75f0e15c8c --- /dev/null +++ b/test_runner/batch_others/test_bulk_insert.py @@ -0,0 +1,25 @@ +from contextlib import closing +import psycopg2.extras + +pytest_plugins = ("fixtures.zenith_fixtures") + +# +# Test insertion of larg number of records +# +# This test is pretty tightly coupled with the current implementation of page version storage +# and garbage collection in object_repository.rs. +# +def test_bulk_insert(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run(["branch", "test_bulk_insert", "empty"]) + pg = postgres.create_start('test_bulk_insert') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)") + cur.execute("create index on t(c1)") + cur.execute("create index on t(c2)") + cur.execute("create index on t(c3)") + cur.execute("create index on t(c4)") + cur.execute("create index on t(c5)") + cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))") + cur.execute("insert into t values (generate_series(1,1000000),random()*1000000,random()*1000000,random()*1000000,random()*1000000)") diff --git a/test_runner/batch_others/test_seq_scan.py b/test_runner/batch_others/test_seq_scan.py new file mode 100644 index 0000000000..ed1ee08d70 --- /dev/null +++ b/test_runner/batch_others/test_seq_scan.py @@ -0,0 +1,26 @@ +from contextlib import closing +import psycopg2.extras +import time + +pytest_plugins = ("fixtures.zenith_fixtures") + +# +# Test insertion of larg number of records +# +# This test is pretty tightly coupled with the current implementation of page version storage +# and garbage collection in object_repository.rs. +# +def test_seq_scan(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run(["branch", "test_seq_scan", "empty"]) + pg = postgres.create_start('test_seq_scan') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)") + cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))") + cur.execute("set max_parallel_workers_per_gather=0"); + for i in range(100): + start = time.time() + cur.execute("select count(*) from t"); + stop = time.time() + print(f'Elapsed time for iterating through 1000000 records is {stop - start}')