mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: implement converter convert
This commit is contained in:
@@ -18,7 +18,6 @@ use datanode::config::StorageConfig;
|
||||
use meta_client::MetaClientOptions;
|
||||
use mito2::sst::file::IndexType;
|
||||
use mito2::sst::parquet::SstInfo;
|
||||
use object_store::ObjectStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder};
|
||||
|
||||
@@ -68,7 +67,7 @@ async fn main() {
|
||||
toml::from_str(&storage_config).expect("Failed to parse storage config");
|
||||
|
||||
// TODO: build sst converter
|
||||
let sst_converter = {
|
||||
let mut sst_converter = {
|
||||
let mut builder = SstConverterBuilder::new_fs(args.input_dir)
|
||||
.with_meta_options(meta_options)
|
||||
.with_storage_config(storage_config);
|
||||
@@ -83,7 +82,7 @@ async fn main() {
|
||||
.expect("Failed to build sst converter")
|
||||
};
|
||||
|
||||
let input_store: &ObjectStore = &sst_converter.input_store;
|
||||
let input_store = sst_converter.input_store.clone();
|
||||
|
||||
if let Some(parquet_dir) = args.parquet_dir {
|
||||
// using opendal to read parquet files in given input object store
|
||||
@@ -111,7 +110,7 @@ async fn main() {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await;
|
||||
convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await;
|
||||
}
|
||||
|
||||
if let Some(json_dir) = args.json_dir {
|
||||
@@ -140,13 +139,13 @@ async fn main() {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await;
|
||||
convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn convert_and_send(
|
||||
input_files: &[InputFile],
|
||||
sst_converter: &SstConverter,
|
||||
sst_converter: &mut SstConverter,
|
||||
db_http_addr: &str,
|
||||
) {
|
||||
let table_names = input_files
|
||||
|
||||
@@ -40,6 +40,7 @@ use crate::error::{
|
||||
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
|
||||
Result, StaleLogEntrySnafu,
|
||||
};
|
||||
use crate::manifest::action::RegionManifest;
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
@@ -489,6 +490,16 @@ impl RegionMetadataLoader {
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<RegionMetadataRef>> {
|
||||
let manifest = self.load_manifest(region_dir, region_options).await?;
|
||||
Ok(manifest.map(|m| m.metadata.clone()))
|
||||
}
|
||||
|
||||
/// Loads the manifest of the region from the region dir.
|
||||
pub async fn load_manifest(
|
||||
&self,
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<Arc<RegionManifest>>> {
|
||||
let region_manifest_options = RegionOpener::manifest_options(
|
||||
&self.config,
|
||||
region_options,
|
||||
@@ -503,7 +514,7 @@ impl RegionMetadataLoader {
|
||||
};
|
||||
|
||||
let manifest = manifest_manager.manifest();
|
||||
Ok(Some(manifest.metadata.clone()))
|
||||
Ok(Some(manifest))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,17 +14,24 @@
|
||||
|
||||
//! SST converter.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datanode::config::{FileConfig, StorageConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use datanode::store::fs::new_fs_object_store;
|
||||
use meta_client::MetaClientOptions;
|
||||
use mito2::access_layer::SstInfoArray;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::read::Source;
|
||||
use mito2::sst::parquet::WriteOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{DatanodeSnafu, Result};
|
||||
use crate::error::{DatanodeSnafu, MitoSnafu, Result};
|
||||
use crate::reader::InputReaderBuilder;
|
||||
use crate::table::TableMetadataHelper;
|
||||
use crate::writer::RegionWriterBuilder;
|
||||
|
||||
/// Input file type.
|
||||
pub enum InputFileType {
|
||||
@@ -60,20 +67,19 @@ pub struct OutputSst {
|
||||
pub struct SstConverter {
|
||||
/// Object store for input files.
|
||||
pub input_store: ObjectStore,
|
||||
/// Object store manager for output files.
|
||||
output_store_manager: ObjectStoreManagerRef,
|
||||
/// Helper to get table meta.
|
||||
table_helper: TableMetadataHelper,
|
||||
/// Output path for the converted SST files.
|
||||
/// If it is not None, the converted SST files will be written to the specified path
|
||||
/// in the `input_store`.
|
||||
/// This is for debugging purposes.
|
||||
output_path: Option<String>,
|
||||
reader_builder: InputReaderBuilder,
|
||||
writer_builder: RegionWriterBuilder,
|
||||
write_opts: WriteOptions,
|
||||
}
|
||||
|
||||
impl SstConverter {
|
||||
/// Converts a list of input to a list of outputs.
|
||||
pub async fn convert(&self, input: &[InputFile]) -> Result<Vec<OutputSst>> {
|
||||
pub async fn convert(&mut self, input: &[InputFile]) -> Result<Vec<OutputSst>> {
|
||||
let mut outputs = Vec::with_capacity(input.len());
|
||||
for file in input {
|
||||
let output = self.convert_one(file).await?;
|
||||
@@ -83,21 +89,24 @@ impl SstConverter {
|
||||
}
|
||||
|
||||
/// Converts one input.
|
||||
async fn convert_one(&self, input: &InputFile) -> Result<OutputSst> {
|
||||
match input.file_type {
|
||||
InputFileType::Parquet => self.convert_parquet(input).await,
|
||||
InputFileType::RemoteWrite => self.convert_remote_write(input).await,
|
||||
}
|
||||
}
|
||||
async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
|
||||
let reader_info = self.reader_builder.read_input(input).await?;
|
||||
let source = Source::Reader(reader_info.reader);
|
||||
let output_dir = self
|
||||
.output_path
|
||||
.as_deref()
|
||||
.unwrap_or(&reader_info.region_dir);
|
||||
let writer = self
|
||||
.writer_builder
|
||||
.build(reader_info.metadata, output_dir, reader_info.region_options)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
/// Converts a parquet input.
|
||||
async fn convert_parquet(&self, input: &InputFile) -> Result<OutputSst> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Converts a remote write input.
|
||||
async fn convert_remote_write(&self, input: &InputFile) -> Result<OutputSst> {
|
||||
todo!()
|
||||
let ssts = writer
|
||||
.write_sst(source, &self.write_opts)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
Ok(OutputSst { ssts })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +116,7 @@ pub struct SstConverterBuilder {
|
||||
meta_options: MetaClientOptions,
|
||||
storage_config: StorageConfig,
|
||||
output_path: Option<String>,
|
||||
config: MitoConfig,
|
||||
}
|
||||
|
||||
impl SstConverterBuilder {
|
||||
@@ -117,6 +127,7 @@ impl SstConverterBuilder {
|
||||
meta_options: MetaClientOptions::default(),
|
||||
storage_config: StorageConfig::default(),
|
||||
output_path: None,
|
||||
config: MitoConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,17 +150,34 @@ impl SstConverterBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the config for the converted SST files.
|
||||
pub fn with_config(mut self, config: MitoConfig) -> Self {
|
||||
self.config = config;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a SST converter.
|
||||
pub async fn build(self) -> Result<SstConverter> {
|
||||
let input_store = new_input_store(&self.input_path).await?;
|
||||
let output_store_manager = new_object_store_manager(&self.storage_config).await?;
|
||||
let table_helper = TableMetadataHelper::new(&self.meta_options).await?;
|
||||
let config = Arc::new(self.config);
|
||||
let reader_builder = InputReaderBuilder::new(
|
||||
input_store.clone(),
|
||||
table_helper,
|
||||
output_store_manager.clone(),
|
||||
config.clone(),
|
||||
);
|
||||
let writer_builder = RegionWriterBuilder::new(config, output_store_manager)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
Ok(SstConverter {
|
||||
input_store,
|
||||
output_store_manager,
|
||||
table_helper,
|
||||
output_path: self.output_path,
|
||||
reader_builder,
|
||||
writer_builder,
|
||||
write_opts: WriteOptions::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,5 +14,164 @@
|
||||
|
||||
//! Reader to read input data in different formats.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::read::BoxedBatchReader;
|
||||
use mito2::region::opener::RegionMetadataLoader;
|
||||
use mito2::region::options::RegionOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::path_utils::region_dir;
|
||||
use store_api::storage::{RegionId, SequenceNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::converter::{InputFile, InputFileType};
|
||||
use crate::error::{MissingTableSnafu, MitoSnafu, Result};
|
||||
use crate::reader::remote_write::RemoteWriteReader;
|
||||
use crate::table::TableMetadataHelper;
|
||||
use crate::OpenDALParquetReader;
|
||||
|
||||
pub(crate) mod parquet;
|
||||
mod remote_write;
|
||||
|
||||
/// Reader and context.
|
||||
pub struct ReaderInfo {
|
||||
pub reader: BoxedBatchReader,
|
||||
pub region_dir: String,
|
||||
pub region_options: RegionOptions,
|
||||
pub metadata: RegionMetadataRef,
|
||||
}
|
||||
|
||||
/// Builder to build readers to read input files.
|
||||
pub(crate) struct InputReaderBuilder {
|
||||
input_store: ObjectStore,
|
||||
table_helper: TableMetadataHelper,
|
||||
region_loader: RegionMetadataLoader,
|
||||
/// Cached region infos for tables.
|
||||
region_infos: HashMap<String, RegionInfo>,
|
||||
}
|
||||
|
||||
impl InputReaderBuilder {
|
||||
pub(crate) fn new(
|
||||
input_store: ObjectStore,
|
||||
table_helper: TableMetadataHelper,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
config: Arc<MitoConfig>,
|
||||
) -> Self {
|
||||
let region_loader = RegionMetadataLoader::new(config, object_store_manager);
|
||||
|
||||
Self {
|
||||
input_store,
|
||||
table_helper,
|
||||
region_loader,
|
||||
region_infos: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a reader to read the input file.
|
||||
pub async fn read_input(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
match input.file_type {
|
||||
InputFileType::Parquet => self.read_parquet(input).await,
|
||||
InputFileType::RemoteWrite => self.read_remote_write(input).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a reader to read the parquet file.
|
||||
pub async fn read_parquet(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
let region_info = self.get_region_info(input).await?;
|
||||
let reader = OpenDALParquetReader::new(
|
||||
self.input_store.clone(),
|
||||
&input.path,
|
||||
region_info.metadata.clone(),
|
||||
Some(region_info.flushed_sequence + 1),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(ReaderInfo {
|
||||
reader: Box::new(reader),
|
||||
region_dir: region_info.region_dir,
|
||||
region_options: region_info.region_options,
|
||||
metadata: region_info.metadata,
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a reader to read the remote write file.
|
||||
pub async fn read_remote_write(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
let region_info = self.get_region_info(input).await?;
|
||||
// TODO(yingwen): Should update the sequence.
|
||||
let reader = RemoteWriteReader::open(
|
||||
self.input_store.clone(),
|
||||
&input.path,
|
||||
&input.catalog,
|
||||
&input.schema,
|
||||
region_info.metadata.clone(),
|
||||
self.table_helper.clone(),
|
||||
)
|
||||
.await?
|
||||
.with_sequence(region_info.flushed_sequence + 1);
|
||||
|
||||
Ok(ReaderInfo {
|
||||
reader: Box::new(reader),
|
||||
region_dir: region_info.region_dir,
|
||||
region_options: region_info.region_options,
|
||||
metadata: region_info.metadata,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_region_info(&mut self, input: &InputFile) -> Result<RegionInfo> {
|
||||
let cache_key = cache_key(&input.catalog, &input.schema, &input.table);
|
||||
if let Some(region_info) = self.region_infos.get(&cache_key) {
|
||||
return Ok(region_info.clone());
|
||||
}
|
||||
|
||||
let table_info = self
|
||||
.table_helper
|
||||
.get_table(&input.catalog, &input.schema, &input.table)
|
||||
.await?
|
||||
.context(MissingTableSnafu {
|
||||
table_name: &input.table,
|
||||
})?;
|
||||
let region_id = to_region_id(table_info.table_info.ident.table_id);
|
||||
let opts = table_info.table_info.to_region_options();
|
||||
// TODO(yingwen): We ignore WAL options now. We should `prepare_wal_options()` in the future.
|
||||
let region_options = RegionOptions::try_from(&opts).context(MitoSnafu)?;
|
||||
let region_dir = region_dir(&table_info.region_storage_path(), region_id);
|
||||
let manifest = self
|
||||
.region_loader
|
||||
.load_manifest(®ion_dir, ®ion_options)
|
||||
.await
|
||||
.context(MitoSnafu)?
|
||||
.context(MissingTableSnafu {
|
||||
table_name: &table_info.table_info.name,
|
||||
})?;
|
||||
let region_info = RegionInfo {
|
||||
metadata: manifest.metadata.clone(),
|
||||
flushed_sequence: manifest.flushed_sequence,
|
||||
region_dir,
|
||||
region_options,
|
||||
};
|
||||
self.region_infos.insert(cache_key, region_info.clone());
|
||||
|
||||
Ok(region_info)
|
||||
}
|
||||
}
|
||||
|
||||
fn to_region_id(table_id: TableId) -> RegionId {
|
||||
RegionId::new(table_id, 0)
|
||||
}
|
||||
|
||||
fn cache_key(catalog: &str, schema: &str, table: &str) -> String {
|
||||
format!("{}/{}/{}", catalog, schema, table)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RegionInfo {
|
||||
metadata: RegionMetadataRef,
|
||||
flushed_sequence: SequenceNumber,
|
||||
region_dir: String,
|
||||
region_options: RegionOptions,
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use datatypes::schema::Schema;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder};
|
||||
use futures_util::StreamExt;
|
||||
use mito2::error::{OpenDalSnafu, ReadParquetSnafu};
|
||||
use mito2::error::ReadParquetSnafu;
|
||||
use mito2::read::{Batch, BatchColumn, BatchReader};
|
||||
use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec};
|
||||
use object_store::ObjectStore;
|
||||
@@ -40,6 +40,8 @@ use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
|
||||
use crate::error::{MitoSnafu, ObjectStoreSnafu, Result};
|
||||
|
||||
pub struct OpenDALParquetReader {
|
||||
inner: RawParquetReader<AsyncReader>,
|
||||
}
|
||||
@@ -50,28 +52,23 @@ impl OpenDALParquetReader {
|
||||
path: &str,
|
||||
metadata: RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
) -> Result<Self, BoxedError> {
|
||||
let reader = operator
|
||||
.reader_with(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?;
|
||||
) -> Result<Self> {
|
||||
let reader = operator.reader_with(path).await.context(ObjectStoreSnafu)?;
|
||||
|
||||
let content_len = operator
|
||||
.stat(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.context(ObjectStoreSnafu)?
|
||||
.content_length();
|
||||
|
||||
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||
.await
|
||||
.context(ReadParquetSnafu { path })
|
||||
.map_err(BoxedError::new)?
|
||||
.context(MitoSnafu)?
|
||||
.build()
|
||||
.context(ReadParquetSnafu { path })
|
||||
.map_err(BoxedError::new)?;
|
||||
.context(MitoSnafu)?;
|
||||
Ok(Self {
|
||||
inner: RawParquetReader::new(stream, metadata, override_sequence, path),
|
||||
})
|
||||
|
||||
@@ -46,7 +46,7 @@ const METRIC_NAME_LABEL: &str = "__name__";
|
||||
const GREPTIME_VALUE: &str = "greptime_value";
|
||||
|
||||
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
|
||||
struct RemoteWriteReader {
|
||||
pub struct RemoteWriteReader {
|
||||
/// Timeseries sorted by primary key in reverse order.
|
||||
/// So we can pop the series.
|
||||
series: Vec<(Vec<u8>, TimeSeries)>,
|
||||
@@ -60,8 +60,8 @@ impl RemoteWriteReader {
|
||||
pub async fn open(
|
||||
operator: ObjectStore,
|
||||
path: &str,
|
||||
catalog: String,
|
||||
schema: String,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
metadata: RegionMetadataRef,
|
||||
table_helper: TableMetadataHelper,
|
||||
) -> Result<Self> {
|
||||
@@ -73,8 +73,8 @@ impl RemoteWriteReader {
|
||||
})?
|
||||
.column_id;
|
||||
let encoder = PrimaryKeyEncoder {
|
||||
catalog,
|
||||
schema,
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
metadata,
|
||||
table_helper,
|
||||
table_ids: HashMap::new(),
|
||||
|
||||
@@ -21,15 +21,13 @@ use mito2::access_layer::{
|
||||
};
|
||||
use mito2::cache::CacheManager;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::error::{RegionMetadataNotFoundSnafu, Result};
|
||||
use mito2::error::Result;
|
||||
use mito2::read::Source;
|
||||
use mito2::region::opener::RegionMetadataLoader;
|
||||
use mito2::region::options::RegionOptions;
|
||||
use mito2::sst::index::intermediate::IntermediateManager;
|
||||
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use mito2::sst::parquet::WriteOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use snafu::OptionExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
/// A writer that can create multiple SST files for a region.
|
||||
@@ -69,21 +67,19 @@ impl RegionWriter {
|
||||
}
|
||||
|
||||
/// Creator to create [`RegionWriter`] for different regions.
|
||||
pub struct RegionWriterCreator {
|
||||
pub struct RegionWriterBuilder {
|
||||
/// Mito engine config.
|
||||
config: Arc<MitoConfig>,
|
||||
/// Object stores.
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
/// Loader to load region metadata.
|
||||
metadata_loader: RegionMetadataLoader,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
}
|
||||
|
||||
impl RegionWriterCreator {
|
||||
impl RegionWriterBuilder {
|
||||
/// Create a new [`RegionContextCreator`].
|
||||
pub async fn new(
|
||||
config: MitoConfig,
|
||||
config: Arc<MitoConfig>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
) -> Result<Self> {
|
||||
let puffin_manager_factory = PuffinManagerFactory::new(
|
||||
@@ -96,14 +92,10 @@ impl RegionWriterCreator {
|
||||
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
|
||||
.await?
|
||||
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
|
||||
let config = Arc::new(config);
|
||||
let metadata_loader =
|
||||
RegionMetadataLoader::new(config.clone(), object_store_manager.clone());
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
object_store_manager,
|
||||
metadata_loader,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
})
|
||||
@@ -112,14 +104,10 @@ impl RegionWriterCreator {
|
||||
/// Builds a [`RegionWriter`] for the given region directory.
|
||||
pub async fn build(
|
||||
&self,
|
||||
metadata: RegionMetadataRef,
|
||||
region_dir: &str,
|
||||
region_options: RegionOptions,
|
||||
) -> Result<RegionWriter> {
|
||||
let metadata = self
|
||||
.metadata_loader
|
||||
.load(region_dir, ®ion_options)
|
||||
.await?
|
||||
.context(RegionMetadataNotFoundSnafu)?;
|
||||
let object_store = mito2::region::opener::get_object_store(
|
||||
®ion_options.storage,
|
||||
&self.object_store_manager,
|
||||
|
||||
Reference in New Issue
Block a user