commit 0679c3a7332aa2d986edbbc9ae511e326a661027 Author: BitHeaven Date: Wed Aug 6 06:33:11 2025 +0000 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9c12ebd --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# ---> Rust +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Database files +*.db + +# RustRover +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..eed2535 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1218 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets 0.52.6", +] + +[[package]] +name = "bb8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5" +dependencies = [ + "futures-util", + "parking_lot", + "tokio", +] + +[[package]] +name = "bitflags" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cc" +version = "1.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" +dependencies = [ + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "errno" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-sink", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "icu_collections" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" + +[[package]] +name = "icu_properties" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "potential_utf", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" + +[[package]] +name = "icu_provider" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" +dependencies = [ + "displaydoc", + "icu_locale_core", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "libc" +version = "0.2.174" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" + +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + +[[package]] +name = "litemap" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" + +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "redis" +version = "0.32.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1f66bf4cac9733a23bcdf1e0e01effbaaad208567beba68be8f67e5f4af3ee1" +dependencies = [ + "bytes", + "cfg-if", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.6.0", + "tokio", + "tokio-util", + "url", +] + +[[package]] +name = "redox_syscall" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" + +[[package]] +name = "rustix" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "sky-derive" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "653a697e7fcd1ecf5f9af1d3277d6d48290f29f076cb0190011eb09cff2238ea" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "skytable" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "189d8a1fbc6887537e289556c6a8c536d0143b277cbad77001a184670e6c2a35" +dependencies = [ + "bb8", + "itoa", + "native-tls", + "r2d2", + "sky-derive", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "slab" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "tempfile" +version = "3.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" +dependencies = [ + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + +[[package]] +name = "tesnig" +version = "0.1.0" +dependencies = [ + "async-trait", + "redis", + "skytable", + "tokio", +] + +[[package]] +name = "tinystr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "tokio" +version = "1.46.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" +dependencies = [ + "backtrace", + "bytes", + "io-uring", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "slab", + "socket2 0.5.10", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.2", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" +dependencies = [ + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "writeable" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", + "synstructure", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..8a3bddb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "tesnig" +version = "0.1.0" +edition = "2024" + +[[bin]] +name = "oversdb" +path = "bin/oversdb.rs" + +[[bin]] +name = "skytable" +path = "bin/skytable.rs" + +[[bin]] +name = "redis" +path = "bin/redis.rs" + +[lib] +name = "benchmark_oversdb" +path = "src/lib.rs" + +[dependencies] +skytable = "0.8.12" +tokio = { version = "1.0", features = ["full"] } +async-trait = "0.1" +redis = { version = "0.32.4", features = ["tokio-comp"] } diff --git a/bin/oversdb.rs b/bin/oversdb.rs new file mode 100644 index 0000000..ec824d6 --- /dev/null +++ b/bin/oversdb.rs @@ -0,0 +1,133 @@ +use benchmark_oversdb::{StressTester, OversDBClient}; + +#[tokio::main] +async fn main() -> Result<(), String> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + println!("OversDB Stress Test & Benchmark Tool"); + println!("===================================="); + println!(); + println!("Usage: {} [options]", args[0]); + println!(); + println!("Test types:"); + println!(" quick - Quick test (1000 ops, 10 clients)"); + println!(" standard - Standard test (10000 ops, 50 clients)"); + println!(" intensive - Intensive test (100000 ops, 100 clients)"); + println!(" extreme - Extreme test (1000000 ops, 200 clients)"); + println!(" debug - Debug test (100 ops, 5 clients)"); + println!(" custom - Custom parameters"); + println!(); + println!("Examples:"); + println!(" {} quick", args[0]); + println!(" {} custom 50000 75 1024", args[0]); + return Ok(()); + } + + let tester = StressTester::new("167.99.33.194:6601".to_string()); + + println!("๐Ÿ”ง Connecting to OversDB server at 167.99.33.194:6601..."); + match OversDBClient::connect("167.99.33.194:6601").await { + Ok(_) => println!("โœ“ Connection successful!"), + Err(e) => { + println!("โœ— Failed to connect: {}", e); + println!("Make sure the OversDB server is running on 167.99.33.194:6601"); + return Err(e); + } + } + println!(); + + let (ops, clients, value_size) = match args[1].as_str() { + "quick" => (1_000, 10, 256), + "standard" => (10_000, 50, 256), + "intensive" => (100_000, 100, 256), + "extreme" => (1_000_000, 200, 256), + "debug" => (100, 5, 64), // Small debug test + "custom" => { + if args.len() < 5 { + return Err("Custom test requires: custom ".to_string()); + } + let ops = args[2].parse().map_err(|_| "Invalid operations count")?; + let clients = args[3].parse().map_err(|_| "Invalid client count")?; + let value_size = args[4].parse().map_err(|_| "Invalid value size")?; + (ops, clients, value_size) + }, + _ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()), + }; + + println!("๐ŸŽฏ Test Configuration:"); + println!(" Total operations: {}", ops); + println!(" Concurrent clients: {}", clients); + println!(" Value size: {} bytes", value_size); + println!(); + + // Prepare some data for read tests + println!("๐Ÿ“ Preparing test data..."); + let mut prep_client = OversDBClient::connect("167.99.33.194:6601").await?; + let prep_value = vec![0x42u8; value_size]; + + for i in 0..10 { + for j in 0..100 { + let key = format!("worker{}:key{}", i, j); + let _ = prep_client.create("benchmark", "test", key.as_bytes(), &prep_value, 0).await; + } + } + println!("โœ“ Test data prepared"); + println!(); + + // Run benchmarks + let mut results = Vec::new(); + + // CREATE benchmark + match tester.benchmark_create(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("CREATE benchmark failed: {}", e), + } + + // READ benchmark + match tester.benchmark_read(ops, clients).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("READ benchmark failed: {}", e), + } + + // MIXED benchmark + match tester.benchmark_mixed(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("MIXED benchmark failed: {}", e), + } + + // Summary + println!("๐Ÿ“ˆ SUMMARY"); + println!("=========="); + for result in &results { + let error_rate = if result.total_operations + result.errors > 0 { + format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0) + } else { + "0.0%".to_string() + }; + + println!("{:8} | {:>10.0} ops/sec | {:>8.2}ฮผs avg latency | {} errors ({})", + result.operation, + result.ops_per_second, + result.avg_latency_us, + result.errors, + error_rate + ); + } + + if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) { + println!(); + println!("๐Ÿ† Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second); + } + + Ok(()) +} diff --git a/bin/redis.rs b/bin/redis.rs new file mode 100644 index 0000000..e19e47c --- /dev/null +++ b/bin/redis.rs @@ -0,0 +1,188 @@ +use benchmark_oversdb::{StressTester, RedisClient}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), String> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + println!("Redis Stress Test & Benchmark Tool"); + println!("=================================="); + println!(); + println!("Usage: {} [options]", args[0]); + println!(); + println!("Test types:"); + println!(" quick - Quick test (1000 ops, 10 clients)"); + println!(" standard - Standard test (10000 ops, 50 clients)"); + println!(" intensive - Intensive test (100000 ops, 100 clients)"); + println!(" extreme - Extreme test (1000000 ops, 200 clients)"); + println!(" debug - Debug test (100 ops, 5 clients)"); + println!(" custom - Custom parameters"); + println!(); + println!("Environment variables:"); + println!(" REDIS_HOST - Server host (default: 127.0.0.1)"); + println!(" REDIS_PORT - Server port (default: 6379)"); + println!(" REDIS_PASSWORD - Password (optional)"); + println!(); + println!("Examples:"); + println!(" {} quick", args[0]); + println!(" {} custom 50000 75 1024", args[0]); + println!(" REDIS_PASSWORD=mypass {} standard", args[0]); + return Ok(()); + } + + // Get connection details from environment or use defaults + let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string()); + let password = env::var("REDIS_PASSWORD").unwrap_or_else(|_| "".to_string()); + let addr = format!("{}:{}", host, port); + + let tester = StressTester::new(addr.clone(), "".to_string(), password.clone()); + + println!("๐Ÿ”ง Connecting to Redis server at {}...", addr); + if password != "" { + println!(" Using password authentication"); + } + + match RedisClient::connect_with_auth(&addr, &password).await { + Ok(mut client) => { + println!("โœ“ Connection successful!"); + + // Test connection with PING + match client.ping().await { + Ok(response) => println!("โœ“ Server responded: {}", response), + Err(e) => { + println!("โš  Warning: PING failed: {}", e); + println!(" Connection might still work for basic operations"); + } + } + + // Optional: Clear database for clean benchmark + if env::var("REDIS_FLUSH_DB").unwrap_or_else(|_| "false".to_string()) == "true" { + println!("๐Ÿงน Flushing database..."); + match client.flush_db().await { + Ok(_) => println!("โœ“ Database flushed"), + Err(e) => println!("โš  Warning: Failed to flush database: {}", e), + } + } + }, + Err(e) => { + println!("โœ— Failed to connect: {}", e); + println!("Make sure the Redis server is running on {}", addr); + println!("Check your connection details and authentication:"); + println!(" REDIS_HOST=your_host REDIS_PORT=your_port {} ", args[0]); + if password == "" { + println!(" If authentication is required:"); + println!(" REDIS_PASSWORD=your_pass {} ", args[0]); + } + return Err(e); + } + } + println!(); + + let (ops, clients, value_size) = match args[1].as_str() { + "quick" => (1_000, 10, 256), + "standard" => (10_000, 50, 256), + "intensive" => (100_000, 100, 256), + "extreme" => (1_000_000, 200, 256), + "debug" => (100, 5, 64), + "custom" => { + if args.len() < 5 { + return Err("Custom test requires: custom ".to_string()); + } + let ops = args[2].parse().map_err(|_| "Invalid operations count")?; + let clients = args[3].parse().map_err(|_| "Invalid client count")?; + let value_size = args[4].parse().map_err(|_| "Invalid value size")?; + (ops, clients, value_size) + }, + _ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()), + }; + + println!("๐ŸŽฏ Test Configuration:"); + println!(" Total operations: {}", ops); + println!(" Concurrent clients: {}", clients); + println!(" Value size: {} bytes", value_size); + println!(); + + // Prepare some data for read tests + println!("๐Ÿ“ Preparing test data..."); + let mut prep_client = RedisClient::connect_with_auth(&addr, &password).await?; + let prep_value = vec![0x42u8; value_size]; + + for i in 0..10 { + for j in 0..100 { + let key = format!("worker{}:key{}", i, j); + let _ = prep_client.set(&key, &prep_value).await; + } + } + println!("โœ“ Test data prepared"); + println!(); + + // Run benchmarks + let mut results = Vec::new(); + + // SET benchmark + match tester.redis_benchmark_set(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("SET benchmark failed: {}", e), + } + + // GET benchmark + match tester.redis_benchmark_get(ops, clients).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("GET benchmark failed: {}", e), + } + + // MIXED benchmark + match tester.redis_benchmark_mixed(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("MIXED benchmark failed: {}", e), + } + + // Summary + println!("๐Ÿ“ˆ SUMMARY"); + println!("=========="); + for result in &results { + let error_rate = if result.total_operations + result.errors > 0 { + format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0) + } else { + "0.0%".to_string() + }; + + println!("{:8} | {:>10.0} ops/sec | {:>8.2}ฮผs avg latency | {} errors ({})", + result.operation, + result.ops_per_second, + result.avg_latency_us, + result.errors, + error_rate + ); + } + + if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) { + println!(); + println!("๐Ÿ† Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second); + } + + // Additional Redis-specific information + println!(); + println!("๐Ÿ“Š Redis Connection Info:"); + println!(" Server: {}", addr); + println!(" Authentication: {}", if password != "" { "Yes" } else { "No" }); + println!(); + println!("๐Ÿ’ก Tips for better Redis performance:"); + println!(" - Use Redis pipelining for batch operations"); + println!(" - Consider Redis Cluster for horizontal scaling"); + println!(" - Monitor memory usage with 'redis-cli info memory'"); + println!(" - Use appropriate data structures (strings, hashes, sets, etc.)"); + + Ok(()) +} diff --git a/bin/skytable.rs b/bin/skytable.rs new file mode 100644 index 0000000..330a801 --- /dev/null +++ b/bin/skytable.rs @@ -0,0 +1,161 @@ +use benchmark_oversdb::{StressTester, SkytableClient}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), String> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + println!("Skytable Stress Test & Benchmark Tool"); + println!("====================================="); + println!(); + println!("Usage: {} [options]", args[0]); + println!(); + println!("Test types:"); + println!(" quick - Quick test (1000 ops, 10 clients)"); + println!(" standard - Standard test (10000 ops, 50 clients)"); + println!(" intensive - Intensive test (100000 ops, 100 clients)"); + println!(" extreme - Extreme test (1000000 ops, 200 clients)"); + println!(" debug - Debug test (100 ops, 5 clients)"); + println!(" custom - Custom parameters"); + println!(); + println!("Environment variables:"); + println!(" SKYTABLE_HOST - Server host (default: 127.0.0.1)"); + println!(" SKYTABLE_USERNAME - Username (default: root)"); + println!(" SKYTABLE_PASSWORD - Password (default: password)"); + println!(); + println!("Examples:"); + println!(" {} quick", args[0]); + println!(" {} custom 50000 75 1024", args[0]); + println!(" SKYTABLE_PASSWORD=mypass {} standard", args[0]); + return Ok(()); + } + + // Get connection details from environment or use defaults + let host = env::var("SKYTABLE_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let username = env::var("SKYTABLE_USERNAME").unwrap_or_else(|_| "root".to_string()); + let password = env::var("SKYTABLE_PASSWORD").unwrap_or_else(|_| "password".to_string()); + + let tester = StressTester::new(host.clone(), username.clone(), password.clone()); + + println!("๐Ÿ”ง Connecting to Skytable server at {}:2003...", host); + println!(" Using credentials: {}/{}", username, password); + + match SkytableClient::connect_with_auth(&host, &username, &password).await { + Ok(mut client) => { + println!("โœ“ Connection successful!"); + // Setup benchmark space + match client.setup_benchmark_space().await { + Ok(_) => println!("โœ“ Benchmark space setup complete!"), + Err(e) => { + println!("โš  Warning: Benchmark space setup failed: {}", e); + println!(" This might be normal if using basic key-value mode"); + } + } + }, + Err(e) => { + println!("โœ— Failed to connect: {}", e); + println!("Make sure the Skytable server is running on {}:2003", host); + println!("and that you have the correct credentials"); + println!("You can set credentials using environment variables:"); + println!(" SKYTABLE_USERNAME=your_user SKYTABLE_PASSWORD=your_pass {} ", args[0]); + return Err(e); + } + } + println!(); + + let (ops, clients, value_size) = match args[1].as_str() { + "quick" => (1_000, 10, 256), + "standard" => (10_000, 50, 256), + "intensive" => (100_000, 100, 256), + "extreme" => (1_000_000, 200, 256), + "debug" => (100, 5, 64), + "custom" => { + if args.len() < 5 { + return Err("Custom test requires: custom ".to_string()); + } + let ops = args[2].parse().map_err(|_| "Invalid operations count")?; + let clients = args[3].parse().map_err(|_| "Invalid client count")?; + let value_size = args[4].parse().map_err(|_| "Invalid value size")?; + (ops, clients, value_size) + }, + _ => return Err("Unknown test type. Use: quick, standard, intensive, extreme, debug, or custom".to_string()), + }; + + println!("๐ŸŽฏ Test Configuration:"); + println!(" Total operations: {}", ops); + println!(" Concurrent clients: {}", clients); + println!(" Value size: {} bytes", value_size); + println!(); + + // Prepare some data for read tests + println!("๐Ÿ“ Preparing test data..."); + let mut prep_client = SkytableClient::connect_with_auth(&host, &username, &password).await?; + prep_client.setup_benchmark_space().await.ok(); // Ignore errors + let prep_value = vec![0x42u8; value_size]; + + for i in 0..10 { + for j in 0..100 { + let key = format!("worker{}:key{}", i, j); + let _ = prep_client.set(&key, &prep_value).await; + } + } + println!("โœ“ Test data prepared"); + println!(); + + // Run benchmarks + let mut results = Vec::new(); + + // SET benchmark + match tester.skytable_benchmark_set(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("SET benchmark failed: {}", e), + } + + // GET benchmark + match tester.skytable_benchmark_get(ops, clients).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("GET benchmark failed: {}", e), + } + + // MIXED benchmark + match tester.skytable_benchmark_mixed(ops, clients, value_size).await { + Ok(result) => { + result.print(); + results.push(result); + }, + Err(e) => println!("MIXED benchmark failed: {}", e), + } + + // Summary + println!("๐Ÿ“ˆ SUMMARY"); + println!("=========="); + for result in &results { + let error_rate = if result.total_operations + result.errors > 0 { + format!("{:.1}%", (result.errors as f64 / (result.total_operations + result.errors) as f64) * 100.0) + } else { + "0.0%".to_string() + }; + + println!("{:8} | {:>10.0} ops/sec | {:>8.2}ฮผs avg latency | {} errors ({})", + result.operation, + result.ops_per_second, + result.avg_latency_us, + result.errors, + error_rate + ); + } + + if let Some(best) = results.iter().filter(|r| r.ops_per_second > 0.0).max_by(|a, b| a.ops_per_second.partial_cmp(&b.ops_per_second).unwrap()) { + println!(); + println!("๐Ÿ† Best performance: {} with {:.0} ops/sec", best.operation, best.ops_per_second); + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a64df1a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1072 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; +use tokio::sync::Semaphore; + +pub mod oversdb; +pub mod skytable_client; +pub mod redis_client; + +pub use oversdb::{OversDBClient, CrudOperation}; +pub use skytable_client::{SkytableClient, SkytableOperation}; +pub use redis_client::{RedisClient, RedisOperation}; + +#[derive(Debug)] +pub struct BenchmarkResult { + pub operation: String, + pub total_operations: u64, + pub duration: Duration, + pub ops_per_second: f64, + pub avg_latency_us: f64, + pub errors: u64, + pub concurrent_connections: usize, +} + +impl BenchmarkResult { + pub fn print(&self) { + println!("๐Ÿ“Š {} Results:", self.operation); + println!(" Total operations: {}", self.total_operations); + println!(" Duration: {:.2}s", self.duration.as_secs_f64()); + println!(" Operations/second: {:.0}", self.ops_per_second); + println!(" Average latency: {:.2}ฮผs", self.avg_latency_us); + println!(" Errors: {}", self.errors); + println!(" Concurrent clients: {}", self.concurrent_connections); + println!(); + } +} + +pub struct StressTester { + addr: String, + username: String, + password: String, +} + +impl StressTester { + pub fn new(addr: String, username: String, password: String) -> Self { + Self { addr, username, password } + } + + // OversDB worker methods (existing) + async fn worker_create( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = OversDBClient::connect(&self.addr).await?; + let value = vec![0x42u8; value_size]; + + let worker_start = Instant::now(); + + for i in 0..operations_per_worker { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("create:{}:{}:{}", timestamp, worker_id, i); + + match client.create("benchmark", "create_test", key.as_bytes(), &value, 0).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + if error_counter.load(Ordering::Relaxed) <= 5 { + eprintln!("CREATE error: {}", e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + async fn worker_read( + &self, + worker_id: usize, + operations_per_worker: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = OversDBClient::connect(&self.addr).await?; + + let start = Instant::now(); + + for i in 0..operations_per_worker { + let key = format!("worker{}:key{}", worker_id % 10, i % 100); + + match client.read("benchmark", "test", key.as_bytes()).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(_) => { + error_counter.fetch_add(1, Ordering::Relaxed); + } + } + } + + Ok(start.elapsed()) + } + + async fn worker_mixed( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = OversDBClient::connect(&self.addr).await?; + let value = vec![0x42u8; value_size]; + + let worker_start = Instant::now(); + + let base_keys: Vec = (0..25).map(|i| format!("mixed:base:{}:{}", worker_id, i)).collect(); + for key in &base_keys { + let _ = client.create("benchmark", "mixed", key.as_bytes(), &value, 0).await; + } + + for i in 0..operations_per_worker { + let operation = i % 4; + let result = match operation { + 0 => { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("mixed:create:{}:{}:{}", timestamp, worker_id, i); + client.create("benchmark", "mixed", key.as_bytes(), &value, 0).await.map(|_| ()) + }, + 1 => { + let key = if i < base_keys.len() { + base_keys[i % base_keys.len()].clone() + } else { + format!("worker{}:key{}", worker_id % 10, i % 100) + }; + + let table = if i < base_keys.len() { "mixed" } else { "test" }; + client.read("benchmark", table, key.as_bytes()).await.map(|_| ()) + }, + 2 => { + let key = &base_keys[i % base_keys.len()]; + let new_value = format!("updated_{}_{}", worker_id, i).into_bytes(); + client.update("benchmark", "mixed", key.as_bytes(), &new_value, 0).await + }, + 3 => { + let key = &base_keys[i % base_keys.len()]; + match client.delete("benchmark", "mixed", key.as_bytes()).await { + Ok(_) => { + client.create("benchmark", "mixed", key.as_bytes(), &value, 0).await + }, + Err(e) => { + match client.create("benchmark", "mixed", key.as_bytes(), &value, 0).await { + Ok(_) => Ok(()), + Err(_) => Err(e), + } + }, + } + }, + _ => unreachable!(), + }; + + match result { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + let error_count = error_counter.load(Ordering::Relaxed); + if error_count <= 5 { + eprintln!("MIXED operation {} (worker {}, iter {}) error: {}", + match operation { + 0 => "CREATE", + 1 => "READ", + 2 => "UPDATE", + 3 => "DELETE", + _ => "UNKNOWN" + }, worker_id, i, e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + // Skytable worker methods + async fn skytable_worker_set( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = SkytableClient::connect(&self.addr, &self.username, &self.password).await?; + client.setup_benchmark_space().await?; + + let value = vec![0x42u8; value_size]; + let worker_start = Instant::now(); + + for i in 0..operations_per_worker { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("set:{}:{}:{}", timestamp, worker_id, i); + + match client.set(&key, &value).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + if error_counter.load(Ordering::Relaxed) <= 5 { + eprintln!("SET error: {}", e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + async fn skytable_worker_get( + &self, + worker_id: usize, + operations_per_worker: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = SkytableClient::connect(&self.addr, &self.username, &self.password).await?; + client.setup_benchmark_space().await?; + + let start = Instant::now(); + + for i in 0..operations_per_worker { + let key = format!("worker{}:key{}", worker_id % 10, i % 100); + + match client.get(&key).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(_) => { + error_counter.fetch_add(1, Ordering::Relaxed); + } + } + } + + Ok(start.elapsed()) + } + + async fn skytable_worker_mixed( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = SkytableClient::connect(&self.addr, &self.username, &self.password).await?; + client.setup_benchmark_space().await?; + + let value = vec![0x42u8; value_size]; + let worker_start = Instant::now(); + + let base_keys: Vec = (0..25).map(|i| format!("mixed:base:{}:{}", worker_id, i)).collect(); + for key in &base_keys { + let _ = client.set(key, &value).await; + } + + for i in 0..operations_per_worker { + let operation = i % 4; + let result = match operation { + 0 => { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("mixed:set:{}:{}:{}", timestamp, worker_id, i); + client.set(&key, &value).await + }, + 1 => { + let key = if i < base_keys.len() { + base_keys[i % base_keys.len()].clone() + } else { + format!("worker{}:key{}", worker_id % 10, i % 100) + }; + client.get(&key).await.map(|_| ()) + }, + 2 => { + let key = &base_keys[i % base_keys.len()]; + let new_value = format!("updated_{}_{}", worker_id, i).into_bytes(); + client.update(key, &new_value).await + }, + 3 => { + let key = &base_keys[i % base_keys.len()]; + match client.delete(key).await { + Ok(_) => { + client.set(key, &value).await + }, + Err(e) => { + match client.set(key, &value).await { + Ok(_) => Ok(()), + Err(_) => Err(e), + } + }, + } + }, + _ => unreachable!(), + }; + + match result { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + let error_count = error_counter.load(Ordering::Relaxed); + if error_count <= 5 { + eprintln!("MIXED operation {} (worker {}, iter {}) error: {}", + match operation { + 0 => "SET", + 1 => "GET", + 2 => "UPDATE", + 3 => "DELETE", + _ => "UNKNOWN" + }, worker_id, i, e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + // Redis worker methods + async fn redis_worker_set( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = RedisClient::connect(&self.addr, &self.password).await?; + let value = vec![0x42u8; value_size]; + let worker_start = Instant::now(); + + for i in 0..operations_per_worker { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("set:{}:{}:{}", timestamp, worker_id, i); + + match client.set(&key, &value).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + if error_counter.load(Ordering::Relaxed) <= 5 { + eprintln!("SET error: {}", e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + async fn redis_worker_get( + &self, + worker_id: usize, + operations_per_worker: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = RedisClient::connect(&self.addr, &self.password).await?; + let start = Instant::now(); + + for i in 0..operations_per_worker { + let key = format!("worker{}:key{}", worker_id % 10, i % 100); + + match client.get(&key).await { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(_) => { + error_counter.fetch_add(1, Ordering::Relaxed); + } + } + } + + Ok(start.elapsed()) + } + + async fn redis_worker_mixed( + &self, + worker_id: usize, + operations_per_worker: usize, + value_size: usize, + success_counter: Arc, + error_counter: Arc, + semaphore: Arc, + start_time: Instant, + ) -> Result { + let _permit = semaphore.acquire().await.unwrap(); + + let mut client = RedisClient::connect(&self.addr, &self.password).await?; + let value = vec![0x42u8; value_size]; + let worker_start = Instant::now(); + + let base_keys: Vec = (0..25).map(|i| format!("mixed:base:{}:{}", worker_id, i)).collect(); + for key in &base_keys { + let _ = client.set(key, &value).await; + } + + for i in 0..operations_per_worker { + let operation = i % 4; + let result = match operation { + 0 => { + let timestamp = start_time.elapsed().as_nanos(); + let key = format!("mixed:set:{}:{}:{}", timestamp, worker_id, i); + client.set(&key, &value).await + }, + 1 => { + let key = if i < base_keys.len() { + base_keys[i % base_keys.len()].clone() + } else { + format!("worker{}:key{}", worker_id % 10, i % 100) + }; + client.get(&key).await.map(|_| ()) + }, + 2 => { + let key = &base_keys[i % base_keys.len()]; + let new_value = format!("updated_{}_{}", worker_id, i).into_bytes(); + client.update(key, &new_value).await + }, + 3 => { + let key = &base_keys[i % base_keys.len()]; + match client.delete(key).await { + Ok(_) => { + client.set(key, &value).await + }, + Err(e) => { + match client.set(key, &value).await { + Ok(_) => Ok(()), + Err(_) => Err(e), + } + }, + } + }, + _ => unreachable!(), + }; + + match result { + Ok(_) => { + success_counter.fetch_add(1, Ordering::Relaxed); + }, + Err(e) => { + error_counter.fetch_add(1, Ordering::Relaxed); + let error_count = error_counter.load(Ordering::Relaxed); + if error_count <= 5 { + eprintln!("MIXED operation {} (worker {}, iter {}) error: {}", + match operation { + 0 => "SET", + 1 => "GET", + 2 => "UPDATE", + 3 => "DELETE", + _ => "UNKNOWN" + }, worker_id, i, e); + } + } + } + } + + Ok(worker_start.elapsed()) + } + + // OversDB benchmark methods + pub async fn benchmark_create( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting CREATE benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.worker_create(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "CREATE".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn benchmark_read( + &self, + total_operations: usize, + concurrent_clients: usize, + ) -> Result { + println!("๐Ÿš€ Starting READ benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.worker_read(worker_id, operations_per_worker, success_counter, error_counter, semaphore).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "READ".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: total_ops as f64 / total_duration.as_secs_f64(), + avg_latency_us: total_worker_time.as_micros() as f64 / total_ops as f64, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn benchmark_mixed( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting MIXED benchmark (25% each: CREATE, READ, UPDATE, DELETE)..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.worker_mixed(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "MIXED".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + // Skytable benchmark methods + pub async fn skytable_benchmark_set( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting Skytable SET benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.skytable_worker_set(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "SET".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn skytable_benchmark_get( + &self, + total_operations: usize, + concurrent_clients: usize, + ) -> Result { + println!("๐Ÿš€ Starting Skytable GET benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.skytable_worker_get(worker_id, operations_per_worker, success_counter, error_counter, semaphore).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "GET".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: total_ops as f64 / total_duration.as_secs_f64(), + avg_latency_us: total_worker_time.as_micros() as f64 / total_ops as f64, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn skytable_benchmark_mixed( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting Skytable MIXED benchmark (25% each: SET, GET, UPDATE, DELETE)..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.skytable_worker_mixed(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "MIXED".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + // Redis benchmark methods + pub async fn redis_benchmark_set( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting Redis SET benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.redis_worker_set(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "SET".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn redis_benchmark_get( + &self, + total_operations: usize, + concurrent_clients: usize, + ) -> Result { + println!("๐Ÿš€ Starting Redis GET benchmark..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.redis_worker_get(worker_id, operations_per_worker, success_counter, error_counter, semaphore).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "GET".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: total_ops as f64 / total_duration.as_secs_f64(), + avg_latency_us: total_worker_time.as_micros() as f64 / total_ops as f64, + errors, + concurrent_connections: concurrent_clients, + }) + } + + pub async fn redis_benchmark_mixed( + &self, + total_operations: usize, + concurrent_clients: usize, + value_size: usize, + ) -> Result { + println!("๐Ÿš€ Starting Redis MIXED benchmark (25% each: SET, GET, UPDATE, DELETE)..."); + println!(" Operations: {}", total_operations); + println!(" Concurrent clients: {}", concurrent_clients); + println!(" Value size: {} bytes", value_size); + + let operations_per_worker = total_operations / concurrent_clients; + let success_counter = Arc::new(AtomicU64::new(0)); + let error_counter = Arc::new(AtomicU64::new(0)); + let semaphore = Arc::new(Semaphore::new(concurrent_clients)); + + let start_time = Instant::now(); + + let mut handles = Vec::new(); + for worker_id in 0..concurrent_clients { + let success_counter = Arc::clone(&success_counter); + let error_counter = Arc::clone(&error_counter); + let semaphore = Arc::clone(&semaphore); + let tester = self.clone(); + + let handle = tokio::spawn(async move { + tester.redis_worker_mixed(worker_id, operations_per_worker, value_size, success_counter, error_counter, semaphore, start_time).await + }); + + handles.push(handle); + } + + let mut total_worker_time = Duration::new(0, 0); + for handle in handles { + match handle.await { + Ok(Ok(duration)) => total_worker_time += duration, + Ok(Err(e)) => return Err(format!("Worker error: {}", e)), + Err(e) => return Err(format!("Join error: {}", e)), + } + } + + let total_duration = start_time.elapsed(); + let total_ops = success_counter.load(Ordering::Relaxed); + let errors = error_counter.load(Ordering::Relaxed); + + Ok(BenchmarkResult { + operation: "MIXED".to_string(), + total_operations: total_ops, + duration: total_duration, + ops_per_second: if total_duration.as_secs_f64() > 0.0 { total_ops as f64 / total_duration.as_secs_f64() } else { 0.0 }, + avg_latency_us: if total_ops > 0 { total_worker_time.as_micros() as f64 / total_ops as f64 } else { 0.0 }, + errors, + concurrent_connections: concurrent_clients, + }) + } + + // Utility methods for test data preparation + pub async fn prepare_test_data(&self, value_size: usize) -> Result<(), String> { + let prep_value = vec![0x42u8; value_size]; + + for i in 0..10 { + for j in 0..100 { + let key = format!("worker{}:key{}", i, j); + let _ = self.prepare_key(&key, &prep_value).await; + } + } + + Ok(()) + } + + async fn prepare_key(&self, key: &str, value: &[u8]) -> Result<(), String> { + // This would need to be implemented based on the database type + // For now, just return Ok for compilation + Ok(()) + } +} + +impl Clone for StressTester { + fn clone(&self) -> Self { + Self { + addr: self.addr.clone(), + username: self.username.clone(), + password: self.password.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_benchmark_result_creation() { + let result = BenchmarkResult { + operation: "TEST".to_string(), + total_operations: 1000, + duration: Duration::from_secs(1), + ops_per_second: 1000.0, + avg_latency_us: 1000.0, + errors: 0, + concurrent_connections: 10, + }; + + assert_eq!(result.operation, "TEST"); + assert_eq!(result.total_operations, 1000); + assert_eq!(result.ops_per_second, 1000.0); + } + + #[test] + fn test_stress_tester_creation() { + let tester = StressTester::new("127.0.0.1:6379".to_string()); + assert_eq!(tester.addr, "127.0.0.1:6379"); + } + + #[test] + fn test_stress_tester_clone() { + let tester = StressTester::new("127.0.0.1:6379".to_string()); + let cloned = tester.clone(); + assert_eq!(tester.addr, cloned.addr); + } +} diff --git a/src/oversdb.rs b/src/oversdb.rs new file mode 100644 index 0000000..7882a74 --- /dev/null +++ b/src/oversdb.rs @@ -0,0 +1,131 @@ +use tokio::net::TcpStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Debug, Clone)] +pub enum CrudOperation { + Create = 0, + Read = 1, + Update = 2, + Delete = 3, +} + +pub struct OversDBClient { + stream: TcpStream, +} + +impl OversDBClient { + pub async fn connect(addr: &str) -> Result { + let stream = TcpStream::connect(addr).await.map_err(|e| e.to_string())?; + Ok(Self { stream }) + } + + pub async fn send_packet( + &mut self, + operation: CrudOperation, + db_name: &str, + table_name: &str, + key: &[u8], + payload: &[u8], + ) -> Result, String> { + // Build packet according to protocol + let mut packet = Vec::new(); + + // [2 bits CRUD][6 bits reserved] + packet.push((operation.clone() as u8) << 6); + + // [1 byte db_len][N bytes db_name] + packet.push(db_name.len() as u8); + packet.extend_from_slice(db_name.as_bytes()); + + // [1 byte table_len][N bytes table_name] + packet.push(table_name.len() as u8); + packet.extend_from_slice(table_name.as_bytes()); + + // [2 bytes key_len][N bytes key] + packet.extend_from_slice(&(key.len() as u16).to_be_bytes()); + packet.extend_from_slice(key); + + // [4 bytes payload_len][N bytes payload] + packet.extend_from_slice(&(payload.len() as u32).to_be_bytes()); + packet.extend_from_slice(payload); + + // Send packet + self.stream.write_all(&packet).await.map_err(|e| e.to_string())?; + + // Read response + match operation { + CrudOperation::Read => { + // Read success byte + let mut success = [0u8; 1]; + self.stream.read_exact(&mut success).await.map_err(|e| e.to_string())?; + + if success[0] == 1 { + // Read value length + let mut len_bytes = [0u8; 4]; + self.stream.read_exact(&mut len_bytes).await.map_err(|e| e.to_string())?; + let len = u32::from_be_bytes(len_bytes) as usize; + + // Read value + let mut value = vec![0u8; len]; + self.stream.read_exact(&mut value).await.map_err(|e| e.to_string())?; + Ok(value) + } else { + Err("Key not found".to_string()) + } + }, + _ => { + // Read success byte for Create/Update/Delete + let mut success = [0u8; 1]; + self.stream.read_exact(&mut success).await.map_err(|e| e.to_string())?; + + if success[0] == 1 { + Ok(vec![1]) + } else { + Err("Operation failed".to_string()) + } + } + } + } + + pub async fn create(&mut self, db_name: &str, table_name: &str, key: &[u8], value: &[u8], ttl: u64) -> Result<(), String> { + let mut payload = Vec::new(); + payload.extend_from_slice(&ttl.to_be_bytes()); + payload.extend_from_slice(value); + + let result = self.send_packet(CrudOperation::Create, db_name, table_name, key, &payload).await?; + + if result[0] == 1 { + Ok(()) + } else { + Err("Create failed".to_string()) + } + } + + pub async fn read(&mut self, db_name: &str, table_name: &str, key: &[u8]) -> Result, String> { + self.send_packet(CrudOperation::Read, db_name, table_name, key, &[]).await + } + + pub async fn update(&mut self, db_name: &str, table_name: &str, key: &[u8], value: &[u8], ttl: u64) -> Result<(), String> { + let mut payload = Vec::new(); + payload.extend_from_slice(&ttl.to_be_bytes()); + payload.extend_from_slice(value); + + let result = self.send_packet(CrudOperation::Update, db_name, table_name, key, &payload).await?; + + if result[0] == 1 { + Ok(()) + } else { + Err("Update failed".to_string()) + } + } + + pub async fn delete(&mut self, db_name: &str, table_name: &str, key: &[u8]) -> Result<(), String> { + let result = self.send_packet(CrudOperation::Delete, db_name, table_name, key, &[]).await?; + + if result[0] == 1 { + Ok(()) + } else { + Err("Delete failed".to_string()) + } + } +} diff --git a/src/redis_client.rs b/src/redis_client.rs new file mode 100644 index 0000000..8e578f8 --- /dev/null +++ b/src/redis_client.rs @@ -0,0 +1,283 @@ +use redis::{Client, Connection, Commands, RedisResult, ConnectionLike, Cmd}; +use tokio::task; + +#[derive(Debug, Clone)] +pub enum RedisOperation { + Set, + Get, + Update, + Delete, +} + +pub struct RedisClient { + client: Client, + connection: Option, +} + +impl RedisClient { + pub async fn connect(addr: &str, password: &str) -> Result { + Self::connect_with_auth(addr, password).await + } + + pub async fn connect_with_auth(addr: &str, password: &str) -> Result { + let addr_clone = addr.to_string(); + let password_clone = password.to_string(); + + // Create connection in blocking task since redis-rs is synchronous + let (client, connection) = task::spawn_blocking(move || { + let redis_url = if password_clone != "" { + format!("redis://:{}@{}", password_clone, addr_clone) + } else { + format!("redis://{}", addr_clone) + }; + + let client = Client::open(redis_url) + .map_err(|e| format!("Failed to create Redis client: {}", e))?; + + let connection = client.get_connection() + .map_err(|e| format!("Failed to connect to Redis: {}", e))?; + + Ok::<(Client, Connection), String>((client, connection)) + }) + .await + .map_err(|e| format!("Task join error: {}", e))??; + + Ok(Self { + client, + connection: Some(connection), + }) + } + + pub async fn set(&mut self, key: &str, value: &[u8]) -> Result<(), String> { + let key = key.to_string(); + let value = value.to_vec(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + // Use binary-safe SET command + let result: RedisResult<()> = conn.set(&key, &value); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("SET failed: {}", e)) + } + + pub async fn get(&mut self, key: &str) -> Result, String> { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + // Use binary-safe GET command + let result: RedisResult> = conn.get(&key); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("GET failed: {}", e)) + } + + pub async fn update(&mut self, key: &str, value: &[u8]) -> Result<(), String> { + // Redis UPDATE is the same as SET - it overwrites existing values + self.set(key, value).await + } + + pub async fn delete(&mut self, key: &str) -> Result<(), String> { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = conn.del(&key); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map(|_| ()).map_err(|e| format!("DELETE failed: {}", e)) + } + + pub async fn exists(&mut self, key: &str) -> Result { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = conn.exists(&key); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("EXISTS failed: {}", e)) + } + + pub async fn ping(&mut self) -> Result { + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = redis::cmd("PING").query(&mut conn); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("PING failed: {}", e)) + } + + pub async fn flush_db(&mut self) -> Result<(), String> { + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult<()> = redis::cmd("FLUSHDB").query(&mut conn); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("FLUSHDB failed: {}", e)) + } + + // Additional utility methods for benchmarking + pub async fn set_with_expiry(&mut self, key: &str, value: &[u8], ttl_seconds: u64) -> Result<(), String> { + let key = key.to_string(); + let value = value.to_vec(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult<()> = conn.set_ex(&key, &value, ttl_seconds); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("SET with expiry failed: {}", e)) + } + + pub async fn mset(&mut self, pairs: &[(&str, &[u8])]) -> Result<(), String> { + if pairs.is_empty() { + return Ok(()); + } + + let pairs: Vec<(String, Vec)> = pairs.iter() + .map(|(k, v)| (k.to_string(), v.to_vec())) + .collect(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let mut cmd = redis::cmd("MSET"); + for (key, value) in &pairs { + cmd.arg(key).arg(value); + } + let result: RedisResult<()> = cmd.query(&mut conn); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("MSET failed: {}", e)) + } + + pub async fn mget(&mut self, keys: &[&str]) -> Result>>, String> { + if keys.is_empty() { + return Ok(Vec::new()); + } + + let keys: Vec = keys.iter().map(|k| k.to_string()).collect(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult>>> = conn.get(&keys); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("MGET failed: {}", e)) + } + + pub async fn incr(&mut self, key: &str) -> Result { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = conn.incr(&key, 1); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("INCR failed: {}", e)) + } + + pub async fn ttl(&mut self, key: &str) -> Result { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = redis::cmd("TTL").arg(&key).query(&mut conn); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("TTL failed: {}", e)) + } + + // Get connection info for benchmarking + pub async fn info(&mut self, section: Option<&str>) -> Result { + let section = section.map(|s| s.to_string()); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let result: RedisResult = match section { + Some(s) => redis::cmd("INFO").arg(s).query(&mut conn), + None => redis::cmd("INFO").query(&mut conn), + }; + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("INFO failed: {}", e)) + } + + // Pipeline for batch operations (advanced feature) + pub async fn pipeline_set(&mut self, operations: &[(&str, &[u8])]) -> Result, String> { + if operations.is_empty() { + return Ok(Vec::new()); + } + + let operations: Vec<(String, Vec)> = operations.iter() + .map(|(k, v)| (k.to_string(), v.to_vec())) + .collect(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let mut pipe = redis::pipe(); + let mut pipe = pipe.atomic(); + + for (key, value) in &operations { + pipe = pipe.set(key, value); + } + + let result: RedisResult> = pipe.query(&mut conn); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("Pipeline SET failed: {}", e)) + } +} diff --git a/src/skytable_client.rs b/src/skytable_client.rs new file mode 100644 index 0000000..d7b7694 --- /dev/null +++ b/src/skytable_client.rs @@ -0,0 +1,135 @@ +use skytable::{Config, query}; +use tokio::task; + +#[derive(Debug, Clone)] +pub enum SkytableOperation { + Set, + Get, + Update, + Delete, +} + +pub struct SkytableClient { + connection: Option, +} + +impl SkytableClient { + pub async fn connect(addr: &str, login: &str, password: &str) -> Result { + Self::connect_with_auth(addr, login, password).await + } + + pub async fn connect_with_auth(addr: &str, username: &str, password: &str) -> Result { + let addr_clone = addr.to_string(); + let username_clone = username.to_string(); + let password_clone = password.to_string(); + + // Create connection in blocking task since Skytable is synchronous + let connection = task::spawn_blocking(move || { + Config::new(&addr_clone, 2003, &username_clone, &password_clone).connect() + }) + .await + .map_err(|e| format!("Task join error: {}", e))? + .map_err(|e| format!("Failed to connect to Skytable: {}", e))?; + + Ok(Self { + connection: Some(connection), + }) + } + + pub async fn set(&mut self, key: &str, value: &[u8]) -> Result<(), String> { + let key = key.to_string(); + let value = value.to_vec(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let q = query!("INSERT INTO benchmark.kvstore (?, ?)", key, value); + let result = conn.query_parse::<()>(&q); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("SET failed: {}", e)) + } + + pub async fn get(&mut self, key: &str) -> Result<(Vec,), String> { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let q = query!("SELECT v FROM benchmark.kvstore WHERE k = ?", key); + let result = conn.query_parse::<(Vec,)>(&q); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("GET failed: {}", e)) + } + + pub async fn update(&mut self, key: &str, value: &[u8]) -> Result<(), String> { + let key = key.to_string(); + let value = value.to_vec(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let q = query!("UPDATE benchmark.kvstore SET v = ? WHERE k = ?", value, key); + let result = conn.query_parse::<()>(&q); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("UPDATE failed: {}", e)) + } + + pub async fn delete(&mut self, key: &str) -> Result<(), String> { + let key = key.to_string(); + + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + let q = query!("DELETE FROM benchmark.kvstore WHERE k = ?", key); + let result = conn.query_parse::<()>(&q); + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + result.1.map_err(|e| format!("DELETE failed: {}", e)) + } + + pub async fn setup_benchmark_space(&mut self) -> Result<(), String> { + let mut conn = self.connection.take().unwrap(); + let result = task::spawn_blocking(move || { + // Create space + let create_space = query!("CREATE SPACE IF NOT EXISTS benchmark"); + let _ = conn.query_parse::(&create_space); // Ignore errors if exists + + // Create model with binary support + let create_model = query!("CREATE MODEL IF NOT EXISTS benchmark.kvstore (k: string, v: binary)"); + let result = conn.query_parse::(&create_model); + + (conn, result) + }) + .await + .map_err(|e| format!("Task join error: {}", e))?; + + self.connection = Some(result.0); + + // Ignore errors if space/model already exists + match result.1 { + Ok(_) => Ok(()), + Err(e) => { + if e.to_string().contains("already exists") || e.to_string().contains("exists") { + Ok(()) + } else { + Err(format!("Setup failed: {}", e)) + } + } + } + } +}