mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
wip2
This commit is contained in:
@@ -155,17 +155,9 @@ async fn import_rel(
|
||||
//
|
||||
// FIXME: Keep track of which relations we've already created?
|
||||
// https://github.com/neondatabase/neon/issues/3309
|
||||
if let Err(e) = modification
|
||||
modification
|
||||
.put_rel_creation(rel, nblocks as u32, ctx)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
RelationError::AlreadyExists => {
|
||||
debug!("Relation {} already exist. We must be extending it.", rel)
|
||||
}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
let r = reader.read_exact(&mut buf).await;
|
||||
|
||||
@@ -1041,6 +1041,7 @@ enum MetadataOp {
|
||||
// - Insert to DBDIR_KEY if this (spcnode, dbnode) does not already exist
|
||||
// - Insert to rel_dir_to_key(spcnode, dbnode)
|
||||
UpsertRelDirectory { spcnode: Oid, dbnode: Oid },
|
||||
UpsertRelDirectory2 { rel: RelTag, nblocks: BlockNumber },
|
||||
// - Insert this xid to TWOPHASEDIR_KEY
|
||||
// UpdateTwoPhaseDir{
|
||||
// xid: TransactionId
|
||||
@@ -1180,6 +1181,77 @@ impl MetadataOp {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Self::UpsertRelDirectory2 { rel, nblocks } => {
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(
|
||||
&data_dir_mod
|
||||
.metadata_state
|
||||
.get(lsn, DBDIR_KEY, ctx)
|
||||
.await
|
||||
.context("read db")?,
|
||||
)
|
||||
.context("deserialize db")?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if let hash_map::Entry::Vacant(e) =
|
||||
dbdir.dbdirs.entry((rel.spcnode, rel.dbnode))
|
||||
{
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
data_dir_mod.put_metadata_page(lsn, DBDIR_KEY, buf.into());
|
||||
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(
|
||||
&data_dir_mod
|
||||
.metadata_state
|
||||
.get(lsn, rel_dir_key, ctx)
|
||||
.await
|
||||
.context("read db")?,
|
||||
)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
// Drop out early if the relation already existed
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
data_dir_mod.put_metadata_page(
|
||||
lsn,
|
||||
rel_dir_key,
|
||||
Bytes::from(RelDirectory::ser(&rel_dir).context("serialize")?),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
data_dir_mod.put_metadata_page(lsn, size_key, Bytes::from(buf.to_vec()));
|
||||
|
||||
data_dir_mod.pending_nblocks += nblocks as i64;
|
||||
|
||||
// Update relation size cache
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.tline
|
||||
.set_cached_rel_size(rel, lsn, nblocks);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1463,56 +1535,15 @@ impl<'a> DatadirModification<'a> {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RelationError> {
|
||||
if rel.relnode == 0 {
|
||||
return Err(RelationError::InvalidRelnode);
|
||||
}
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: here, or earlier, validate that rel.relnode != 0 -- perhaps on construction of the RelTag?
|
||||
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks += nblocks as i64;
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
self.put_metadata_op(
|
||||
self.lsn,
|
||||
MetadataOp::UpsertRelDirectory2 { rel, nblocks },
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
|
||||
// caller.
|
||||
|
||||
Reference in New Issue
Block a user