first commit

This commit is contained in:
2025-08-06 06:33:11 +00:00
commit 0679c3a733
10 changed files with 3368 additions and 0 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
# ---> Rust
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# Database files
*.db
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

1218
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

26
Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "tesnig"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "oversdb"
path = "bin/oversdb.rs"
[[bin]]
name = "skytable"
path = "bin/skytable.rs"
[[bin]]
name = "redis"
path = "bin/redis.rs"
[lib]
name = "benchmark_oversdb"
path = "src/lib.rs"
[dependencies]
skytable = "0.8.12"
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
redis = { version = "0.32.4", features = ["tokio-comp"] }

133
bin/oversdb.rs Normal file
View File

@ -0,0 +1,133 @@
use benchmark_oversdb::{StressTester, OversDBClient};
#[tokio::main]
async fn main() -> Result<(), String> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("OversDB Stress Test & Benchmark Tool");
println!("====================================");
println!();
println!("Usage: {} <test_type> [options]", args[0]);
println!();
println!("Test types:");
println!(" quick - Quick test (1000 ops, 10 clients)");
println!(" standard - Standard test (10000 ops, 50 clients)");
println!(" intensive - Intensive test (100000 ops, 100 clients)");
println!(" extreme - Extreme test (1000000 ops, 200 clients)");
println!(" debug - Debug test (100 ops, 5 clients)");
println!(" custom <ops> <clients> <value_size> - Custom parameters");
println!();
println!("Examples:");
println!(" {} quick", args[0]);
println!(" {} custom 50000 75 1024", args[0]);
return Ok(());
}
let tester = StressTester::new("167.99.33.194:6601".to_string());
println!("🔧 Connecting to OversDB server at 167.99.33.194:6601...");
match OversDBClient::connect("167.99.33.194:6601").await {
Ok(_) => println!("✓ Connection successful!"),
Err(e) => {
println!("✗ Failed to connect: {}", e);
println!("Make sure the OversDB server is running on 167.99.33.194:6601");
return Err(e);
}
}
println!();
let (ops, clients, value_size) = match args[1].as_str() {
"quick" => (1_000, 10, 256),
"standard" => (10_000, 50, 256),
"intensive" => (100_000, 100, 256),
"extreme" => (1_000_000, 200, 256),
"debug" => (100, 5, 64), // Small debug test
"custom" => {
if args.len() < 5 {
return Err("Custom test requires: custom <ops> <clients> <value_size>".to_string());
}
let ops = args[2].parse().map_err(|_| "Invalid operations count")?;
let clients = args[3].parse().map_err(|_| "Invalid client count")?;
let value_size = args[4].parse().map_err(|_| "Invalid value size")?;
(ops, clients, value_size)
},
_ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()),
};
println!("🎯 Test Configuration:");
println!(" Total operations: {}", ops);
println!(" Concurrent clients: {}", clients);
println!(" Value size: {} bytes", value_size);
println!();
// Prepare some data for read tests
println!("📝 Preparing test data...");
let mut prep_client = OversDBClient::connect("167.99.33.194:6601").await?;
let prep_value = vec![0x42u8; value_size];
for i in 0..10 {
for j in 0..100 {
let key = format!("worker{}:key{}", i, j);
let _ = prep_client.create("benchmark", "test", key.as_bytes(), &prep_value, 0).await;
}
}
println!("✓ Test data prepared");
println!();
// Run benchmarks
let mut results = Vec::new();
// CREATE benchmark
match tester.benchmark_create(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("CREATE benchmark failed: {}", e),
}
// READ benchmark
match tester.benchmark_read(ops, clients).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("READ benchmark failed: {}", e),
}
// MIXED benchmark
match tester.benchmark_mixed(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("MIXED benchmark failed: {}", e),
}
// Summary
println!("📈 SUMMARY");
println!("==========");
for result in &results {
let error_rate = if result.total_operations + result.errors > 0 {
format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0)
} else {
"0.0%".to_string()
};
println!("{:8} | {:>10.0} ops/sec | {:>8.2}μs avg latency | {} errors ({})",
result.operation,
result.ops_per_second,
result.avg_latency_us,
result.errors,
error_rate
);
}
if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) {
println!();
println!("🏆 Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second);
}
Ok(())
}

188
bin/redis.rs Normal file
View File

@ -0,0 +1,188 @@
use benchmark_oversdb::{StressTester, RedisClient};
use std::env;
#[tokio::main]
async fn main() -> Result<(), String> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("Redis Stress Test & Benchmark Tool");
println!("==================================");
println!();
println!("Usage: {} <test_type> [options]", args[0]);
println!();
println!("Test types:");
println!(" quick - Quick test (1000 ops, 10 clients)");
println!(" standard - Standard test (10000 ops, 50 clients)");
println!(" intensive - Intensive test (100000 ops, 100 clients)");
println!(" extreme - Extreme test (1000000 ops, 200 clients)");
println!(" debug - Debug test (100 ops, 5 clients)");
println!(" custom <ops> <clients> <value_size> - Custom parameters");
println!();
println!("Environment variables:");
println!(" REDIS_HOST - Server host (default: 127.0.0.1)");
println!(" REDIS_PORT - Server port (default: 6379)");
println!(" REDIS_PASSWORD - Password (optional)");
println!();
println!("Examples:");
println!(" {} quick", args[0]);
println!(" {} custom 50000 75 1024", args[0]);
println!(" REDIS_PASSWORD=mypass {} standard", args[0]);
return Ok(());
}
// Get connection details from environment or use defaults
let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string());
let password = env::var("REDIS_PASSWORD").unwrap_or_else(|_| "".to_string());
let addr = format!("{}:{}", host, port);
let tester = StressTester::new(addr.clone(), "".to_string(), password.clone());
println!("🔧 Connecting to Redis server at {}...", addr);
if password != "" {
println!(" Using password authentication");
}
match RedisClient::connect_with_auth(&addr, &password).await {
Ok(mut client) => {
println!("✓ Connection successful!");
// Test connection with PING
match client.ping().await {
Ok(response) => println!("✓ Server responded: {}", response),
Err(e) => {
println!("⚠ Warning: PING failed: {}", e);
println!(" Connection might still work for basic operations");
}
}
// Optional: Clear database for clean benchmark
if env::var("REDIS_FLUSH_DB").unwrap_or_else(|_| "false".to_string()) == "true" {
println!("🧹 Flushing database...");
match client.flush_db().await {
Ok(_) => println!("✓ Database flushed"),
Err(e) => println!("⚠ Warning: Failed to flush database: {}", e),
}
}
},
Err(e) => {
println!("✗ Failed to connect: {}", e);
println!("Make sure the Redis server is running on {}", addr);
println!("Check your connection details and authentication:");
println!(" REDIS_HOST=your_host REDIS_PORT=your_port {} <test_type>", args[0]);
if password == "" {
println!(" If authentication is required:");
println!(" REDIS_PASSWORD=your_pass {} <test_type>", args[0]);
}
return Err(e);
}
}
println!();
let (ops, clients, value_size) = match args[1].as_str() {
"quick" => (1_000, 10, 256),
"standard" => (10_000, 50, 256),
"intensive" => (100_000, 100, 256),
"extreme" => (1_000_000, 200, 256),
"debug" => (100, 5, 64),
"custom" => {
if args.len() < 5 {
return Err("Custom test requires: custom <ops> <clients> <value_size>".to_string());
}
let ops = args[2].parse().map_err(|_| "Invalid operations count")?;
let clients = args[3].parse().map_err(|_| "Invalid client count")?;
let value_size = args[4].parse().map_err(|_| "Invalid value size")?;
(ops, clients, value_size)
},
_ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()),
};
println!("🎯 Test Configuration:");
println!(" Total operations: {}", ops);
println!(" Concurrent clients: {}", clients);
println!(" Value size: {} bytes", value_size);
println!();
// Prepare some data for read tests
println!("📝 Preparing test data...");
let mut prep_client = RedisClient::connect_with_auth(&addr, &password).await?;
let prep_value = vec![0x42u8; value_size];
for i in 0..10 {
for j in 0..100 {
let key = format!("worker{}:key{}", i, j);
let _ = prep_client.set(&key, &prep_value).await;
}
}
println!("✓ Test data prepared");
println!();
// Run benchmarks
let mut results = Vec::new();
// SET benchmark
match tester.redis_benchmark_set(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("SET benchmark failed: {}", e),
}
// GET benchmark
match tester.redis_benchmark_get(ops, clients).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("GET benchmark failed: {}", e),
}
// MIXED benchmark
match tester.redis_benchmark_mixed(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("MIXED benchmark failed: {}", e),
}
// Summary
println!("📈 SUMMARY");
println!("==========");
for result in &results {
let error_rate = if result.total_operations + result.errors > 0 {
format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0)
} else {
"0.0%".to_string()
};
println!("{:8} | {:>10.0} ops/sec | {:>8.2}μs avg latency | {} errors ({})",
result.operation,
result.ops_per_second,
result.avg_latency_us,
result.errors,
error_rate
);
}
if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) {
println!();
println!("🏆 Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second);
}
// Additional Redis-specific information
println!();
println!("📊 Redis Connection Info:");
println!(" Server: {}", addr);
println!(" Authentication: {}", if password != "" { "Yes" } else { "No" });
println!();
println!("💡 Tips for better Redis performance:");
println!(" - Use Redis pipelining for batch operations");
println!(" - Consider Redis Cluster for horizontal scaling");
println!(" - Monitor memory usage with 'redis-cli info memory'");
println!(" - Use appropriate data structures (strings, hashes, sets, etc.)");
Ok(())
}

161
bin/skytable.rs Normal file
View File

@ -0,0 +1,161 @@
use benchmark_oversdb::{StressTester, SkytableClient};
use std::env;
#[tokio::main]
async fn main() -> Result<(), String> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("Skytable Stress Test & Benchmark Tool");
println!("=====================================");
println!();
println!("Usage: {} <test_type> [options]", args[0]);
println!();
println!("Test types:");
println!(" quick - Quick test (1000 ops, 10 clients)");
println!(" standard - Standard test (10000 ops, 50 clients)");
println!(" intensive - Intensive test (100000 ops, 100 clients)");
println!(" extreme - Extreme test (1000000 ops, 200 clients)");
println!(" debug - Debug test (100 ops, 5 clients)");
println!(" custom <ops> <clients> <value_size> - Custom parameters");
println!();
println!("Environment variables:");
println!(" SKYTABLE_HOST - Server host (default: 127.0.0.1)");
println!(" SKYTABLE_USERNAME - Username (default: root)");
println!(" SKYTABLE_PASSWORD - Password (default: password)");
println!();
println!("Examples:");
println!(" {} quick", args[0]);
println!(" {} custom 50000 75 1024", args[0]);
println!(" SKYTABLE_PASSWORD=mypass {} standard", args[0]);
return Ok(());
}
// Get connection details from environment or use defaults
let host = env::var("SKYTABLE_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let username = env::var("SKYTABLE_USERNAME").unwrap_or_else(|_| "root".to_string());
let password = env::var("SKYTABLE_PASSWORD").unwrap_or_else(|_| "password".to_string());
let tester = StressTester::new(host.clone(), username.clone(), password.clone());
println!("🔧 Connecting to Skytable server at {}:2003...", host);
println!(" Using credentials: {}/{}", username, password);
match SkytableClient::connect_with_auth(&host, &username, &password).await {
Ok(mut client) => {
println!("✓ Connection successful!");
// Setup benchmark space
match client.setup_benchmark_space().await {
Ok(_) => println!("✓ Benchmark space setup complete!"),
Err(e) => {
println!("⚠ Warning: Benchmark space setup failed: {}", e);
println!(" This might be normal if using basic key-value mode");
}
}
},
Err(e) => {
println!("✗ Failed to connect: {}", e);
println!("Make sure the Skytable server is running on {}:2003", host);
println!("and that you have the correct credentials");
println!("You can set credentials using environment variables:");
println!(" SKYTABLE_USERNAME=your_user SKYTABLE_PASSWORD=your_pass {} <test_type>", args[0]);
return Err(e);
}
}
println!();
let (ops, clients, value_size) = match args[1].as_str() {
"quick" => (1_000, 10, 256),
"standard" => (10_000, 50, 256),
"intensive" => (100_000, 100, 256),
"extreme" => (1_000_000, 200, 256),
"debug" => (100, 5, 64),
"custom" => {
if args.len() < 5 {
return Err("Custom test requires: custom <ops> <clients> <value_size>".to_string());
}
let ops = args[2].parse().map_err(|_| "Invalid operations count")?;
let clients = args[3].parse().map_err(|_| "Invalid client count")?;
let value_size = args[4].parse().map_err(|_| "Invalid value size")?;
(ops, clients, value_size)
},
_ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()),
};
println!("🎯 Test Configuration:");
println!(" Total operations: {}", ops);
println!(" Concurrent clients: {}", clients);
println!(" Value size: {} bytes", value_size);
println!();
// Prepare some data for read tests
println!("📝 Preparing test data...");
let mut prep_client = SkytableClient::connect_with_auth(&host, &username, &password).await?;
prep_client.setup_benchmark_space().await.ok(); // Ignore errors
let prep_value = vec![0x42u8; value_size];
for i in 0..10 {
for j in 0..100 {
let key = format!("worker{}:key{}", i, j);
let _ = prep_client.set(&key, &prep_value).await;
}
}
println!("✓ Test data prepared");
println!();
// Run benchmarks
let mut results = Vec::new();
// SET benchmark
match tester.skytable_benchmark_set(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("SET benchmark failed: {}", e),
}
// GET benchmark
match tester.skytable_benchmark_get(ops, clients).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("GET benchmark failed: {}", e),
}
// MIXED benchmark
match tester.skytable_benchmark_mixed(ops, clients, value_size).await {
Ok(result) => {
result.print();
results.push(result);
},
Err(e) => println!("MIXED benchmark failed: {}", e),
}
// Summary
println!("📈 SUMMARY");
println!("==========");
for result in &results {
let error_rate = if result.total_operations + result.errors > 0 {
format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0)
} else {
"0.0%".to_string()
};
println!("{:8} | {:>10.0} ops/sec | {:>8.2}μs avg latency | {} errors ({})",
result.operation,
result.ops_per_second,
result.avg_latency_us,
result.errors,
error_rate
);
}
if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) {
println!();
println!("🏆 Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second);
}
Ok(())
}

1072
src/lib.rs Normal file

File diff suppressed because it is too large Load Diff

131
src/oversdb.rs Normal file
View File

@ -0,0 +1,131 @@
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Clone)]
pub enum CrudOperation {
Create = 0,
Read = 1,
Update = 2,
Delete = 3,
}
pub struct OversDBClient {
stream: TcpStream,
}
impl OversDBClient {
pub async fn connect(addr: &str) -> Result<Self, String> {
let stream = TcpStream::connect(addr).await.map_err(|e| e.to_string())?;
Ok(Self { stream })
}
pub async fn send_packet(
&mut self,
operation: CrudOperation,
db_name: &str,
table_name: &str,
key: &[u8],
payload: &[u8],
) -> Result<Vec<u8>, String> {
// Build packet according to protocol
let mut packet = Vec::new();
// [2 bits CRUD][6 bits reserved]
packet.push((operation.clone() as u8) << 6);
// [1 byte db_len][N bytes db_name]
packet.push(db_name.len() as u8);
packet.extend_from_slice(db_name.as_bytes());
// [1 byte table_len][N bytes table_name]
packet.push(table_name.len() as u8);
packet.extend_from_slice(table_name.as_bytes());
// [2 bytes key_len][N bytes key]
packet.extend_from_slice(&(key.len() as u16).to_be_bytes());
packet.extend_from_slice(key);
// [4 bytes payload_len][N bytes payload]
packet.extend_from_slice(&(payload.len() as u32).to_be_bytes());
packet.extend_from_slice(payload);
// Send packet
self.stream.write_all(&packet).await.map_err(|e| e.to_string())?;
// Read response
match operation {
CrudOperation::Read => {
// Read success byte
let mut success = [0u8; 1];
self.stream.read_exact(&mut success).await.map_err(|e| e.to_string())?;
if success[0] == 1 {
// Read value length
let mut len_bytes = [0u8; 4];
self.stream.read_exact(&mut len_bytes).await.map_err(|e| e.to_string())?;
let len = u32::from_be_bytes(len_bytes) as usize;
// Read value
let mut value = vec![0u8; len];
self.stream.read_exact(&mut value).await.map_err(|e| e.to_string())?;
Ok(value)
} else {
Err("Key not found".to_string())
}
},
_ => {
// Read success byte for Create/Update/Delete
let mut success = [0u8; 1];
self.stream.read_exact(&mut success).await.map_err(|e| e.to_string())?;
if success[0] == 1 {
Ok(vec![1])
} else {
Err("Operation failed".to_string())
}
}
}
}
pub async fn create(&mut self, db_name: &str, table_name: &str, key: &[u8], value: &[u8], ttl: u64) -> Result<(), String> {
let mut payload = Vec::new();
payload.extend_from_slice(&ttl.to_be_bytes());
payload.extend_from_slice(value);
let result = self.send_packet(CrudOperation::Create, db_name, table_name, key, &payload).await?;
if result[0] == 1 {
Ok(())
} else {
Err("Create failed".to_string())
}
}
pub async fn read(&mut self, db_name: &str, table_name: &str, key: &[u8]) -> Result<Vec<u8>, String> {
self.send_packet(CrudOperation::Read, db_name, table_name, key, &[]).await
}
pub async fn update(&mut self, db_name: &str, table_name: &str, key: &[u8], value: &[u8], ttl: u64) -> Result<(), String> {
let mut payload = Vec::new();
payload.extend_from_slice(&ttl.to_be_bytes());
payload.extend_from_slice(value);
let result = self.send_packet(CrudOperation::Update, db_name, table_name, key, &payload).await?;
if result[0] == 1 {
Ok(())
} else {
Err("Update failed".to_string())
}
}
pub async fn delete(&mut self, db_name: &str, table_name: &str, key: &[u8]) -> Result<(), String> {
let result = self.send_packet(CrudOperation::Delete, db_name, table_name, key, &[]).await?;
if result[0] == 1 {
Ok(())
} else {
Err("Delete failed".to_string())
}
}
}

283
src/redis_client.rs Normal file
View File

@ -0,0 +1,283 @@
use redis::{Client, Connection, Commands, RedisResult, ConnectionLike, Cmd};
use tokio::task;
#[derive(Debug, Clone)]
pub enum RedisOperation {
Set,
Get,
Update,
Delete,
}
pub struct RedisClient {
client: Client,
connection: Option<Connection>,
}
impl RedisClient {
pub async fn connect(addr: &str, password: &str) -> Result<Self, String> {
Self::connect_with_auth(addr, password).await
}
pub async fn connect_with_auth(addr: &str, password: &str) -> Result<Self, String> {
let addr_clone = addr.to_string();
let password_clone = password.to_string();
// Create connection in blocking task since redis-rs is synchronous
let (client, connection) = task::spawn_blocking(move || {
let redis_url = if password_clone != "" {
format!("redis://:{}@{}", password_clone, addr_clone)
} else {
format!("redis://{}", addr_clone)
};
let client = Client::open(redis_url)
.map_err(|e| format!("Failed to create Redis client: {}", e))?;
let connection = client.get_connection()
.map_err(|e| format!("Failed to connect to Redis: {}", e))?;
Ok::<(Client, Connection), String>((client, connection))
})
.await
.map_err(|e| format!("Task join error: {}", e))??;
Ok(Self {
client,
connection: Some(connection),
})
}
pub async fn set(&mut self, key: &str, value: &[u8]) -> Result<(), String> {
let key = key.to_string();
let value = value.to_vec();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
// Use binary-safe SET command
let result: RedisResult<()> = conn.set(&key, &value);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("SET failed: {}", e))
}
pub async fn get(&mut self, key: &str) -> Result<Vec<u8>, String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
// Use binary-safe GET command
let result: RedisResult<Vec<u8>> = conn.get(&key);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("GET failed: {}", e))
}
pub async fn update(&mut self, key: &str, value: &[u8]) -> Result<(), String> {
// Redis UPDATE is the same as SET - it overwrites existing values
self.set(key, value).await
}
pub async fn delete(&mut self, key: &str) -> Result<(), String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<i32> = conn.del(&key);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map(|_| ()).map_err(|e| format!("DELETE failed: {}", e))
}
pub async fn exists(&mut self, key: &str) -> Result<bool, String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<bool> = conn.exists(&key);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("EXISTS failed: {}", e))
}
pub async fn ping(&mut self) -> Result<String, String> {
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<String> = redis::cmd("PING").query(&mut conn);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("PING failed: {}", e))
}
pub async fn flush_db(&mut self) -> Result<(), String> {
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<()> = redis::cmd("FLUSHDB").query(&mut conn);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("FLUSHDB failed: {}", e))
}
// Additional utility methods for benchmarking
pub async fn set_with_expiry(&mut self, key: &str, value: &[u8], ttl_seconds: u64) -> Result<(), String> {
let key = key.to_string();
let value = value.to_vec();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<()> = conn.set_ex(&key, &value, ttl_seconds);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("SET with expiry failed: {}", e))
}
pub async fn mset(&mut self, pairs: &[(&str, &[u8])]) -> Result<(), String> {
if pairs.is_empty() {
return Ok(());
}
let pairs: Vec<(String, Vec<u8>)> = pairs.iter()
.map(|(k, v)| (k.to_string(), v.to_vec()))
.collect();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let mut cmd = redis::cmd("MSET");
for (key, value) in &pairs {
cmd.arg(key).arg(value);
}
let result: RedisResult<()> = cmd.query(&mut conn);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("MSET failed: {}", e))
}
pub async fn mget(&mut self, keys: &[&str]) -> Result<Vec<Option<Vec<u8>>>, String> {
if keys.is_empty() {
return Ok(Vec::new());
}
let keys: Vec<String> = keys.iter().map(|k| k.to_string()).collect();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<Vec<Option<Vec<u8>>>> = conn.get(&keys);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("MGET failed: {}", e))
}
pub async fn incr(&mut self, key: &str) -> Result<i64, String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<i64> = conn.incr(&key, 1);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("INCR failed: {}", e))
}
pub async fn ttl(&mut self, key: &str) -> Result<i64, String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<i64> = redis::cmd("TTL").arg(&key).query(&mut conn);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("TTL failed: {}", e))
}
// Get connection info for benchmarking
pub async fn info(&mut self, section: Option<&str>) -> Result<String, String> {
let section = section.map(|s| s.to_string());
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let result: RedisResult<String> = match section {
Some(s) => redis::cmd("INFO").arg(s).query(&mut conn),
None => redis::cmd("INFO").query(&mut conn),
};
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("INFO failed: {}", e))
}
// Pipeline for batch operations (advanced feature)
pub async fn pipeline_set(&mut self, operations: &[(&str, &[u8])]) -> Result<Vec<bool>, String> {
if operations.is_empty() {
return Ok(Vec::new());
}
let operations: Vec<(String, Vec<u8>)> = operations.iter()
.map(|(k, v)| (k.to_string(), v.to_vec()))
.collect();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let mut pipe = redis::pipe();
let mut pipe = pipe.atomic();
for (key, value) in &operations {
pipe = pipe.set(key, value);
}
let result: RedisResult<Vec<bool>> = pipe.query(&mut conn);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("Pipeline SET failed: {}", e))
}
}

135
src/skytable_client.rs Normal file
View File

@ -0,0 +1,135 @@
use skytable::{Config, query};
use tokio::task;
#[derive(Debug, Clone)]
pub enum SkytableOperation {
Set,
Get,
Update,
Delete,
}
pub struct SkytableClient {
connection: Option<skytable::Connection>,
}
impl SkytableClient {
pub async fn connect(addr: &str, login: &str, password: &str) -> Result<Self, String> {
Self::connect_with_auth(addr, login, password).await
}
pub async fn connect_with_auth(addr: &str, username: &str, password: &str) -> Result<Self, String> {
let addr_clone = addr.to_string();
let username_clone = username.to_string();
let password_clone = password.to_string();
// Create connection in blocking task since Skytable is synchronous
let connection = task::spawn_blocking(move || {
Config::new(&addr_clone, 2003, &username_clone, &password_clone).connect()
})
.await
.map_err(|e| format!("Task join error: {}", e))?
.map_err(|e| format!("Failed to connect to Skytable: {}", e))?;
Ok(Self {
connection: Some(connection),
})
}
pub async fn set(&mut self, key: &str, value: &[u8]) -> Result<(), String> {
let key = key.to_string();
let value = value.to_vec();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let q = query!("INSERT INTO benchmark.kvstore (?, ?)", key, value);
let result = conn.query_parse::<()>(&q);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("SET failed: {}", e))
}
pub async fn get(&mut self, key: &str) -> Result<(Vec<u8>,), String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let q = query!("SELECT v FROM benchmark.kvstore WHERE k = ?", key);
let result = conn.query_parse::<(Vec<u8>,)>(&q);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("GET failed: {}", e))
}
pub async fn update(&mut self, key: &str, value: &[u8]) -> Result<(), String> {
let key = key.to_string();
let value = value.to_vec();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let q = query!("UPDATE benchmark.kvstore SET v = ? WHERE k = ?", value, key);
let result = conn.query_parse::<()>(&q);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("UPDATE failed: {}", e))
}
pub async fn delete(&mut self, key: &str) -> Result<(), String> {
let key = key.to_string();
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
let q = query!("DELETE FROM benchmark.kvstore WHERE k = ?", key);
let result = conn.query_parse::<()>(&q);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
result.1.map_err(|e| format!("DELETE failed: {}", e))
}
pub async fn setup_benchmark_space(&mut self) -> Result<(), String> {
let mut conn = self.connection.take().unwrap();
let result = task::spawn_blocking(move || {
// Create space
let create_space = query!("CREATE SPACE IF NOT EXISTS benchmark");
let _ = conn.query_parse::<bool>(&create_space); // Ignore errors if exists
// Create model with binary support
let create_model = query!("CREATE MODEL IF NOT EXISTS benchmark.kvstore (k: string, v: binary)");
let result = conn.query_parse::<bool>(&create_model);
(conn, result)
})
.await
.map_err(|e| format!("Task join error: {}", e))?;
self.connection = Some(result.0);
// Ignore errors if space/model already exists
match result.1 {
Ok(_) => Ok(()),
Err(e) => {
if e.to_string().contains("already exists") || e.to_string().contains("exists") {
Ok(())
} else {
Err(format!("Setup failed: {}", e))
}
}
}
}
}