fix: use mark-deletion for system catalog (#1874)

* fix: use mark-deletion for system catalog

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix the default value

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean tables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-04 16:00:39 +08:00
committed by GitHub
parent 20f2fc4a2a
commit 746fe8b4fe
7 changed files with 115 additions and 12 deletions

View File

@@ -243,9 +243,12 @@ impl LocalCatalogManager {
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
max_table_id = max_table_id.max(t.table_id);
if t.is_deleted {
continue;
}
self.open_and_register_table(&t).await?;
info!("Registered table: {:?}", t);
max_table_id = max_table_id.max(t.table_id);
}
}
}
@@ -602,6 +605,7 @@ mod tests {
table_name: "T1".to_string(),
table_id: 1,
engine: MITO_ENGINE.to_string(),
is_deleted: false,
}),
Entry::Catalog(CatalogEntry {
catalog_name: "C2".to_string(),
@@ -623,6 +627,7 @@ mod tests {
table_name: "T2".to_string(),
table_id: 2,
engine: MITO_ENGINE.to_string(),
is_deleted: false,
}),
];
let res = LocalCatalogManager::sort_entries(vec);

View File

@@ -203,20 +203,32 @@ pub fn build_table_insert_request(
build_insert_request(
EntryType::Table,
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue { table_name, engine })
.unwrap()
.as_bytes(),
serde_json::to_string(&TableEntryValue {
table_name,
engine,
is_deleted: false,
})
.unwrap()
.as_bytes(),
)
}
pub(crate) fn build_table_deletion_request(
request: &DeregisterTableRequest,
table_id: TableId,
) -> DeleteRequest {
let table_key = format_table_entry_key(&request.catalog, &request.schema, table_id);
DeleteRequest {
key_column_values: build_primary_key_columns(EntryType::Table, table_key.as_bytes()),
}
) -> InsertRequest {
let entry_key = format_table_entry_key(&request.catalog, &request.schema, table_id);
build_insert_request(
EntryType::Table,
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue {
table_name: "".to_string(),
engine: "".to_string(),
is_deleted: true,
})
.unwrap()
.as_bytes(),
)
}
fn build_primary_key_columns(entry_type: EntryType, key: &[u8]) -> HashMap<String, VectorRef> {
@@ -335,6 +347,7 @@ pub fn decode_system_catalog(
table_name: table_meta.table_name,
table_id,
engine: table_meta.engine,
is_deleted: table_meta.is_deleted,
}))
}
}
@@ -391,6 +404,7 @@ pub struct TableEntry {
pub table_name: String,
pub table_id: TableId,
pub engine: String,
pub is_deleted: bool,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -399,12 +413,19 @@ pub struct TableEntryValue {
#[serde(default = "mito_engine")]
pub engine: String,
#[serde(default = "not_deleted")]
pub is_deleted: bool,
}
fn mito_engine() -> String {
MITO_ENGINE.to_string()
}
fn not_deleted() -> bool {
false
}
#[cfg(test)]
mod tests {
use common_recordbatch::RecordBatches;
@@ -563,6 +584,7 @@ mod tests {
table_name: "my_table".to_string(),
table_id: 1,
engine: MITO_ENGINE.to_string(),
is_deleted: false,
});
assert_eq!(entry, expected);
@@ -574,11 +596,11 @@ mod tests {
},
1,
);
let result = catalog_table.delete(table_deletion).await.unwrap();
let result = catalog_table.insert(table_deletion).await.unwrap();
assert_eq!(result, 1);
let records = catalog_table.records().await.unwrap();
let batches = RecordBatches::try_collect(records).await.unwrap().take();
assert_eq!(batches.len(), 0);
assert_eq!(batches.len(), 1);
}
}

View File

@@ -69,7 +69,7 @@ impl SystemCatalog {
) -> CatalogResult<()> {
self.information_schema
.system
.delete(build_table_deletion_request(request, table_id))
.insert(build_table_deletion_request(request, table_id))
.await
.map(|x| {
if x != 1 {

View File

@@ -51,3 +51,15 @@ tql eval (300, 300, '1s') 10 atan2 NaN;
| 1970-01-01T00:05:00 | NaN |
+---------------------+-------+
drop table trigx;
Affected Rows: 1
drop table trigy;
Affected Rows: 1
drop table trignan;
Affected Rows: 1

View File

@@ -33,3 +33,9 @@ tql eval (300, 300, '1s') 10 atan2 20;
-- eval instant at 5m 10 atan2 NaN
-- NaN
tql eval (300, 300, '1s') 10 atan2 NaN;
drop table trigx;
drop table trigy;
drop table trignan;

View File

@@ -0,0 +1,42 @@
create table t1 (a timestamp time index);
Affected Rows: 0
create table t2 (b timestamp time index);
Affected Rows: 0
drop table t1;
Affected Rows: 1
drop table t2;
Affected Rows: 1
-- SQLNESS ARG restart=true
show tables;
+---------+
| Tables |
+---------+
| numbers |
| scripts |
+---------+
create table t3 (c timestamp time index);
Affected Rows: 0
desc table t3;
+-------+----------------------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+----------------------+------+---------+---------------+
| c | TimestampMillisecond | NO | | TIME INDEX |
+-------+----------------------+------+---------+---------------+
drop table t3;
Affected Rows: 1

View File

@@ -0,0 +1,16 @@
create table t1 (a timestamp time index);
create table t2 (b timestamp time index);
drop table t1;
drop table t2;
-- SQLNESS ARG restart=true
show tables;
create table t3 (c timestamp time index);
desc table t3;
drop table t3;