diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 1d8a58a323..d4d0fdaaf5 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -42,6 +42,18 @@ use super::{ #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); +fn materialize_key(prefix: &Key, blkno: u32) -> Key { + let mut key = prefix.clone(); + key.field6 = blkno; + key +} + +fn key_to_prefix(key: &Key) -> Key { + let mut prefix = key.clone(); + prefix.field6 = 0; + prefix +} + pub struct InMemoryLayer { conf: &'static PageServerConf, tenant_shard_id: TenantShardId, @@ -83,7 +95,7 @@ pub struct InMemoryLayerInner { /// All versions of all pages in the layer are kept here. Indexed /// by block number and LSN. The value is an offset into the /// ephemeral file where the page version is stored. - index: BTreeMap>, + index: BTreeMap>>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. @@ -274,30 +286,34 @@ impl InMemoryLayer { let cursor = inner.file.block_cursor(); let mut buf = Vec::new(); - for (key, vec_map) in inner.index.iter() { - for (lsn, pos) in vec_map.as_slice() { - let mut desc = String::new(); - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; - let val = Value::des(&buf); - match val { - Ok(Value::Image(img)) => { - write!(&mut desc, " img {} bytes", img.len())?; - } - Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); - write!( - &mut desc, - " rec {} bytes will_init: {} {}", - buf.len(), - rec.will_init(), - wal_desc - )?; - } - Err(err) => { - write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?; + for (key_prefix, inner) in inner.index.iter() { + for (blkno, vec_map) in inner { + let key = materialize_key(key_prefix, *blkno); + + for (lsn, pos) in vec_map.as_slice() { + let mut desc = String::new(); + cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; + let val = Value::des(&buf); + match val { + Ok(Value::Image(img)) => { + write!(&mut desc, " img {} bytes", img.len())?; + } + Ok(Value::WalRecord(rec)) => { + let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); + write!( + &mut desc, + " rec {} bytes will_init: {} {}", + buf.len(), + rec.will_init(), + wal_desc + )?; + } + Err(err) => { + write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?; + } } + println!(" key {} at {}: {}", key, lsn, desc); } - println!(" key {} at {}: {}", key, lsn, desc); } } @@ -324,23 +340,25 @@ impl InMemoryLayer { let reader = inner.file.block_cursor(); // Scan the page versions backwards, starting from `lsn`. - if let Some(vec_map) = inner.index.get(&key) { - let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - let buf = reader.read_blob(*pos, &ctx).await?; - let value = Value::des(&buf)?; - match value { - Value::Image(img) => { - reconstruct_state.img = Some((*entry_lsn, img)); - return Ok(ValueReconstructResult::Complete); - } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((*entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back - need_image = false; - break; + if let Some(inner) = inner.index.get(&key_to_prefix(&key)) { + if let Some(vec_map) = inner.get(&key.field6) { + let slice = vec_map.slice_range(lsn_range); + for (entry_lsn, pos) in slice.iter().rev() { + let buf = reader.read_blob(*pos, &ctx).await?; + let value = Value::des(&buf)?; + match value { + Value::Image(img) => { + reconstruct_state.img = Some((*entry_lsn, img)); + return Ok(ValueReconstructResult::Complete); + } + Value::WalRecord(rec) => { + let will_init = rec.will_init(); + reconstruct_state.records.push((*entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } } } } @@ -377,34 +395,54 @@ impl InMemoryLayer { let reader = inner.file.block_cursor(); for range in keyspace.ranges.iter() { - for (key, vec_map) in inner.index.range(range.start..range.end) { - let lsn_range = match reconstruct_state.get_cached_lsn(key) { - Some(cached_lsn) => (cached_lsn + 1)..end_lsn, - None => self.start_lsn..end_lsn, + let range_incl = range.start..=Key::from_i128(Key::to_i128(&range.end) - 1); + + let prefix_start = key_to_prefix(&range.start); + let prefix_end = key_to_prefix(&range.end); + + for (prefix, relation_idx) in inner.index.range(prefix_start..=prefix_end) { + let blkno_start = if prefix == &key_to_prefix(&range_incl.start()) { + range_incl.start().field6 + } else { + 0 }; - let slice = vec_map.slice_range(lsn_range); + let blkno_end = if prefix == &key_to_prefix(&range_incl.end()) { + range_incl.end().field6 + } else { + 0xffffffff + }; - for (entry_lsn, pos) in slice.iter().rev() { - // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183 - let buf = reader.read_blob(*pos, &ctx).await; - if let Err(e) = buf { - reconstruct_state - .on_key_error(*key, PageReconstructError::from(anyhow!(e))); - break; - } + for (blkno, vec_map) in relation_idx.range(blkno_start..=blkno_end) { + let key = materialize_key(prefix, *blkno); + let lsn_range = match reconstruct_state.get_cached_lsn(&key) { + Some(cached_lsn) => (cached_lsn + 1)..end_lsn, + None => self.start_lsn..end_lsn, + }; - let value = Value::des(&buf.unwrap()); - if let Err(e) = value { - reconstruct_state - .on_key_error(*key, PageReconstructError::from(anyhow!(e))); - break; - } + let slice = vec_map.slice_range(lsn_range); - let key_situation = - reconstruct_state.update_key(key, *entry_lsn, value.unwrap()); - if key_situation == ValueReconstructSituation::Complete { - break; + for (entry_lsn, pos) in slice.iter().rev() { + // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183 + let buf = reader.read_blob(*pos, &ctx).await; + if let Err(e) = buf { + reconstruct_state + .on_key_error(key, PageReconstructError::from(anyhow!(e))); + break; + } + + let value = Value::des(&buf.unwrap()); + if let Err(e) = value { + reconstruct_state + .on_key_error(key, PageReconstructError::from(anyhow!(e))); + break; + } + + let key_situation = + reconstruct_state.update_key(&key, *entry_lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + break; + } } } }