fix: etcd range pagenation in table metadata migration tool (#2035)

This commit is contained in:
LFC
2023-07-25 18:02:26 +08:00
committed by GitHub
parent 0b4ac987cd
commit 48996b0646

View File

@@ -20,7 +20,10 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue};
use common_meta::key::TableMetaKey;
use common_meta::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest, RangeResponse,
};
use common_meta::util::get_prefix_end_key;
use common_telemetry::info;
use etcd_client::Client;
use meta_srv::service::store::etcd::EtcdStore;
@@ -61,24 +64,58 @@ struct MigrateTableMetadata {
#[async_trait]
impl Tool for MigrateTableMetadata {
async fn do_work(&self) -> Result<()> {
let req = RangeRequest::new().with_prefix(b"__tg".to_vec());
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));
let mut key = b"__tg".to_vec();
let range_end = get_prefix_end_key(&key);
self.create_table_name_key(&value).await;
let mut processed_keys = 0;
loop {
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
self.create_datanode_table_keys(&value).await;
let req = RangeRequest::new()
.with_range(key, range_end.clone())
.with_limit(1000);
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs.iter() {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));
self.split_table_global_value(&key, value).await;
self.create_table_name_key(&value).await;
self.create_datanode_table_keys(&value).await;
self.split_table_global_value(&key, value).await;
}
self.delete_migrated_keys(&resp).await;
processed_keys += resp.kvs.len();
if resp.more {
key = get_prefix_end_key(resp.kvs.last().unwrap().key());
} else {
break;
}
}
info!("Total migrated TableGlobalKeys: {processed_keys}");
Ok(())
}
}
impl MigrateTableMetadata {
async fn delete_migrated_keys(&self, resp: &RangeResponse) {
info!("Deleting {} TableGlobalKeys", resp.kvs.len());
let req = BatchDeleteRequest {
keys: resp.kvs.iter().map(|kv| kv.key().to_vec()).collect(),
prev_kv: false,
};
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store.batch_delete(req).await.unwrap();
}
}
async fn split_table_global_value(&self, key: &str, value: TableGlobalValue) {
let table_id = value.table_id();
let region_distribution: RegionDistribution = value.regions_id_map.into_iter().collect();