From 572ae743883df19f5ca9f32d7cdce7a7ca5cca4f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 21 Jul 2022 07:45:11 +0300 Subject: [PATCH] More precisely control size of inmem layer (#1927) * More precisely control size of inmem layer * Force recompaction of L0 layers if them contains large non-wallogged BLOBs to avoid too large layers * Add modified version of test_hot_update test (test_dup_key.py) which should generate large layers without large number of tables * Change test name in test_dup_key * Add Layer::get_max_key_range function * Add layer::key_iter method and implement new approach of splitting layers during compaction based on total size of all key values * Add test_large_schema test for checking layer file size after compaction * Make clippy happy * Restore checking LSN distance threshold for checkpoint in-memory layer * Optimize stoage keys iterator * Update pageserver/src/layered_repository.rs Co-authored-by: Heikki Linnakangas * Update pageserver/src/layered_repository.rs Co-authored-by: Heikki Linnakangas * Update pageserver/src/layered_repository.rs Co-authored-by: Heikki Linnakangas * Update pageserver/src/layered_repository.rs Co-authored-by: Heikki Linnakangas * Update pageserver/src/layered_repository.rs Co-authored-by: Heikki Linnakangas * Fix code style * Reduce number of tables in test_large_schema to make it fit in timeout with debug build * Fix style of test_large_schema.py * Fix handlng of duplicates layers Co-authored-by: Heikki Linnakangas --- pageserver/src/layered_repository.rs | 176 ++++++++++++++---- .../src/layered_repository/delta_layer.rs | 84 +++++++++ .../src/layered_repository/ephemeral_file.rs | 2 +- .../src/layered_repository/inmemory_layer.rs | 8 + .../src/layered_repository/storage_layer.rs | 6 + test_runner/batch_others/test_large_schema.py | 82 ++++++++ test_runner/performance/test_dup_key.py | 48 +++++ 7 files changed, 372 insertions(+), 34 deletions(-) create mode 100644 test_runner/batch_others/test_large_schema.py create mode 100644 test_runner/performance/test_dup_key.py diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 93acce912c..3830e4c1bd 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1734,30 +1734,43 @@ impl LayeredTimeline { /// pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { let last_lsn = self.get_last_record_lsn(); + let layers = self.layers.read().unwrap(); + if let Some(open_layer) = &layers.open_layer { + let open_layer_size = open_layer.size()?; + drop(layers); + let distance = last_lsn.widening_sub(self.last_freeze_at.load()); + // Checkpointing the open layer can be triggered by layer size or LSN range. + // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and + // we want to stay below that with a big margin. The LSN distance determines how + // much WAL the safekeepers need to store. + if distance >= self.get_checkpoint_distance().into() + || open_layer_size > self.get_checkpoint_distance() + { + info!( + "check_checkpoint_distance {}, layer size {}", + distance, open_layer_size + ); - // Has more than 'checkpoint_distance' of WAL been accumulated? - let distance = last_lsn.widening_sub(self.last_freeze_at.load()); - if distance >= self.get_checkpoint_distance().into() { - // Yes. Freeze the current in-memory layer. - self.freeze_inmem_layer(true); - self.last_freeze_at.store(last_lsn); + self.freeze_inmem_layer(true); + self.last_freeze_at.store(last_lsn); - // Launch a thread to flush the frozen layer to disk, unless - // a thread was already running. (If the thread was running - // at the time that we froze the layer, it must've seen the - // the layer we just froze before it exited; see comments - // in flush_frozen_layers()) - if let Ok(guard) = self.layer_flush_lock.try_lock() { - drop(guard); - let self_clone = Arc::clone(self); - thread_mgr::spawn( - thread_mgr::ThreadKind::LayerFlushThread, - Some(self.tenant_id), - Some(self.timeline_id), - "layer flush thread", - false, - move || self_clone.flush_frozen_layers(false), - )?; + // Launch a thread to flush the frozen layer to disk, unless + // a thread was already running. (If the thread was running + // at the time that we froze the layer, it must've seen the + // the layer we just froze before it exited; see comments + // in flush_frozen_layers()) + if let Ok(guard) = self.layer_flush_lock.try_lock() { + drop(guard); + let self_clone = Arc::clone(self); + thread_mgr::spawn( + thread_mgr::ThreadKind::LayerFlushThread, + Some(self.tenant_id), + Some(self.timeline_id), + "layer flush thread", + false, + move || self_clone.flush_frozen_layers(false), + )?; + } } } Ok(()) @@ -2211,9 +2224,59 @@ impl LayeredTimeline { } }); + // This iterator walks through all keys and is needed to calculate size used by each key + let mut all_keys_iter = deltas_to_compact + .iter() + .map(|l| l.key_iter()) + .kmerge_by(|a, b| { + let (a_key, a_lsn, _) = a; + let (b_key, b_lsn, _) = b; + match a_key.cmp(b_key) { + Ordering::Less => true, + Ordering::Equal => a_lsn <= b_lsn, + Ordering::Greater => false, + } + }); + // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. // + // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one. + // It's possible that there is a single key with so many page versions that storing all of them in a single layer file + // would be too large. In that case, we also split on the LSN dimension. + // + // LSN + // ^ + // | + // | +-----------+ +--+--+--+--+ + // | | | | | | | | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ ==> | | | | | + // | | | | | | | | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ +--+--+--+--+ + // | + // +--------------> key + // + // + // If one key (X) has a lot of page versions: + // + // LSN + // ^ + // | (X) + // | +-----------+ +--+--+--+--+ + // | | | | | | | | + // | +-----------+ | | +--+ | + // | | | | | | | | + // | +-----------+ ==> | | | | | + // | | | | | +--+ | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ +--+--+--+--+ + // | + // +--------------> key // TODO: this actually divides the layers into fixed-size chunks, not // based on the partitioning. // @@ -2222,29 +2285,76 @@ impl LayeredTimeline { let mut new_layers = Vec::new(); let mut prev_key: Option = None; let mut writer: Option = None; + let mut key_values_total_size = 0u64; + let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key + let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key for x in all_values_iter { let (key, lsn, value) = x?; - - if let Some(prev_key) = prev_key { - if key != prev_key && writer.is_some() { - let size = writer.as_mut().unwrap().size(); - if size > target_file_size { - new_layers.push(writer.take().unwrap().finish(prev_key.next())?); + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key || lsn == dup_end_lsn { + let mut next_key_size = 0u64; + let is_dup_layer = dup_end_lsn.is_valid(); + dup_start_lsn = Lsn::INVALID; + if !same_key { + dup_end_lsn = Lsn::INVALID; + } + // Determine size occupied by this key. We stop at next key, or when size becomes larger than target_file_size + for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { + next_key_size = next_size; + if key != next_key { + if dup_end_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + break; + } + key_values_total_size += next_size; + if key_values_total_size > target_file_size { + // split key between multiple layers: such layer can contain only single key + dup_start_lsn = if dup_end_lsn.is_valid() { + dup_end_lsn + } else { + lsn + }; + dup_end_lsn = next_lsn; + break; + } + } + // handle case when loop reaches last key + if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + // check if key cause layer overflow + if is_dup_layer + || dup_end_lsn.is_valid() + || written_size + key_values_total_size > target_file_size + { + new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); writer = None; } } + key_values_total_size = next_key_size; } - if writer.is_none() { writer = Some(DeltaLayerWriter::new( self.conf, self.timeline_id, self.tenant_id, key, - lsn_range.clone(), + if dup_end_lsn.is_valid() { + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn + } else { + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, )?); } - writer.as_mut().unwrap().put_value(key, lsn, value)?; prev_key = Some(key); } @@ -2276,12 +2386,12 @@ impl LayeredTimeline { // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones let mut layer_paths_do_delete = HashSet::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact { + for l in &deltas_to_compact { l.delete()?; if let Some(path) = l.local_path() { layer_paths_do_delete.insert(path); } - layers.remove_historic(l); + layers.remove_historic(l.clone()); } drop(layers); diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index ed342c0cca..d622df531a 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -316,6 +316,18 @@ impl Layer for DeltaLayer { } } + fn key_iter<'a>(&'a self) -> Box + 'a> { + let inner = match self.load() { + Ok(inner) => inner, + Err(e) => panic!("Failed to load a delta layer: {e:?}"), + }; + + match DeltaKeyIter::new(inner) { + Ok(iter) => Box::new(iter), + Err(e) => panic!("Layer index is corrupted: {e:?}"), + } + } + fn delete(&self) -> Result<()> { // delete underlying file fs::remove_file(self.path())?; @@ -822,3 +834,75 @@ impl<'a> DeltaValueIter<'a> { } } } +/// +/// Iterator over all keys stored in a delta layer +/// +/// FIXME: This creates a Vector to hold all keys. +/// That takes up quite a lot of memory. Should do this in a more streaming +/// fashion. +/// +struct DeltaKeyIter { + all_keys: Vec<(DeltaKey, u64)>, + next_idx: usize, +} + +impl Iterator for DeltaKeyIter { + type Item = (Key, Lsn, u64); + + fn next(&mut self) -> Option { + if self.next_idx < self.all_keys.len() { + let (delta_key, size) = &self.all_keys[self.next_idx]; + + let key = delta_key.key(); + let lsn = delta_key.lsn(); + + self.next_idx += 1; + Some((key, lsn, *size)) + } else { + None + } + } +} + +impl<'a> DeltaKeyIter { + fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { + let file = inner.file.as_ref().unwrap(); + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + inner.index_start_blk, + inner.index_root_blk, + file, + ); + + let mut all_keys: Vec<(DeltaKey, u64)> = Vec::new(); + tree_reader.visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, value| { + let delta_key = DeltaKey::from_slice(key); + let pos = BlobRef(value).pos(); + if let Some(last) = all_keys.last_mut() { + if last.0.key() == delta_key.key() { + return true; + } else { + // subtract offset of new key BLOB and first blob of this key + // to get total size if values associated with this key + let first_pos = last.1; + last.1 = pos - first_pos; + } + } + all_keys.push((delta_key, pos)); + true + }, + )?; + if let Some(last) = all_keys.last_mut() { + // Last key occupies all space till end of layer + last.1 = std::fs::metadata(&file.file.path)?.len() - last.1; + } + let iter = DeltaKeyIter { + all_keys, + next_idx: 0, + }; + + Ok(iter) + } +} diff --git a/pageserver/src/layered_repository/ephemeral_file.rs b/pageserver/src/layered_repository/ephemeral_file.rs index cdde9d5d13..299bb4e873 100644 --- a/pageserver/src/layered_repository/ephemeral_file.rs +++ b/pageserver/src/layered_repository/ephemeral_file.rs @@ -43,7 +43,7 @@ pub struct EphemeralFile { _timelineid: ZTimelineId, file: Arc, - size: u64, + pub size: u64, } impl EphemeralFile { diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 87e6877520..1f89f333dd 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -233,6 +233,14 @@ impl Layer for InMemoryLayer { } impl InMemoryLayer { + /// + /// Get layer size on the disk + /// + pub fn size(&self) -> Result { + let inner = self.inner.read().unwrap(); + Ok(inner.file.size) + } + /// /// Create a new, empty, in-memory layer /// diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index aaf765b83d..e10330bdd3 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -139,6 +139,12 @@ pub trait Layer: Send + Sync { /// Iterate through all keys and values stored in the layer fn iter(&self) -> Box> + '_>; + /// Iterate through all keys stored in the layer. Returns key, lsn and value size + /// It is used only for compaction and so is currently implemented only for DeltaLayer + fn key_iter(&self) -> Box + '_> { + panic!("Not implemented") + } + /// Permanently remove this layer from disk. fn delete(&self) -> Result<()>; diff --git a/test_runner/batch_others/test_large_schema.py b/test_runner/batch_others/test_large_schema.py new file mode 100644 index 0000000000..18ae0614a9 --- /dev/null +++ b/test_runner/batch_others/test_large_schema.py @@ -0,0 +1,82 @@ +import time +import os +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.log_helper import log + + +# This test creates large number of tables which cause large catalog. +# Right now Neon serialize directory as single key-value storage entry and so +# it leads to layer filled mostly by one key. +# Originally Neon implementation of checkpoint and compaction is not able to split key which leads +# to large (several gigabytes) layer files (both ephemeral and delta layers). +# It may cause problems with uploading to S3 and also degrade performance because ephemeral file swapping. +# +def test_large_schema(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + + pg = env.postgres.create_start('main') + + conn = pg.connect() + cur = conn.cursor() + + tables = 2 # 10 is too much for debug build + partitions = 1000 + for i in range(1, tables + 1): + print(f'iteration {i} / {tables}') + + # Restart compute. Restart is actually not strictly needed. + # It is done mostly because this test originally tries to model the problem reported by Ketteq. + pg.stop() + # Kill and restart the pageserver. + # env.pageserver.stop(immediate=True) + # env.pageserver.start() + pg.start() + + retry_sleep = 0.5 + max_retries = 200 + retries = 0 + while True: + try: + conn = pg.connect() + cur = conn.cursor() + cur.execute(f"CREATE TABLE if not exists t_{i}(pk integer) partition by range (pk)") + for j in range(1, partitions + 1): + cur.execute( + f"create table if not exists p_{i}_{j} partition of t_{i} for values from ({j}) to ({j + 1})" + ) + cur.execute(f"insert into t_{i} values (generate_series(1,{partitions}))") + cur.execute("vacuum full") + conn.close() + + except Exception as error: + # It's normal that it takes some time for the pageserver to + # restart, and for the connection to fail until it does. It + # should eventually recover, so retry until it succeeds. + print(f'failed: {error}') + if retries < max_retries: + retries += 1 + print(f'retry {retries} / {max_retries}') + time.sleep(retry_sleep) + continue + else: + raise + break + + conn = pg.connect() + cur = conn.cursor() + + for i in range(1, tables + 1): + cur.execute(f"SELECT count(*) FROM t_{i}") + assert cur.fetchone() == (partitions, ) + + cur.execute("set enable_sort=off") + cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid") + + # Check layer file sizes + tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] + timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant_id, timeline_id) + for filename in os.listdir(timeline_path): + if filename.startswith('00000'): + log.info(f'layer {filename} size is {os.path.getsize(timeline_path + filename)}') + assert os.path.getsize(timeline_path + filename) < 512_000_000 diff --git a/test_runner/performance/test_dup_key.py b/test_runner/performance/test_dup_key.py new file mode 100644 index 0000000000..a8caceb61a --- /dev/null +++ b/test_runner/performance/test_dup_key.py @@ -0,0 +1,48 @@ +import pytest +from contextlib import closing +from fixtures.compare_fixtures import PgCompare +from pytest_lazyfixture import lazy_fixture # type: ignore + + +@pytest.mark.parametrize( + "env", + [ + # The test is too slow to run in CI, but fast enough to run with remote tests + pytest.param(lazy_fixture("neon_compare"), id="neon", marks=pytest.mark.slow), + pytest.param(lazy_fixture("vanilla_compare"), id="vanilla", marks=pytest.mark.slow), + pytest.param(lazy_fixture("remote_compare"), id="remote", marks=pytest.mark.remote_cluster), + ]) +def test_dup_key(env: PgCompare): + # Update the same page many times, then measure read performance + + with closing(env.pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("SET synchronous_commit=off") + cur.execute("SET statement_timeout=0") + + # Write many updates to the same row + with env.record_duration('write'): + cur.execute("create table t (i integer, filler text);") + cur.execute('insert into t values (0);') + cur.execute(""" +do $$ +begin + for ivar in 1..5000000 loop + update t set i = ivar, filler = repeat('a', 50); + update t set i = ivar, filler = repeat('b', 50); + update t set i = ivar, filler = repeat('c', 50); + update t set i = ivar, filler = repeat('d', 50); + rollback; + end loop; +end; +$$; +""") + + # Write 3-4 MB to evict t from compute cache + cur.execute('create table f (i integer);') + cur.execute(f'insert into f values (generate_series(1,100000));') + + # Read + with env.record_duration('read'): + cur.execute('select * from t;') + cur.fetchall()