Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Integrated Java KV interface #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified __pycache__/pylib.cpython-38.pyc
Binary file not shown.
178 changes: 178 additions & 0 deletions bencher/src/demo_javakv_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use super::Cli;
use base64::encode;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use reqwest::Response;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fmt::{format, Debug};
use std::fs::{self, File};
use std::io::BufReader;
use std::process::{self, Command as Process};
use std::time::{SystemTime, UNIX_EPOCH};
use std::io::Write;

use super::PlatformOpsBind;
use crate::parse_app::App;
use crate::{Metric, PlatformOps, SpecTarget, BUCKET};

#[derive(Default)]
pub struct JavaKvTest(Option<PlatformOpsBind>);

impl SpecTarget for JavaKvTest {
fn set_platform(&mut self, platform: PlatformOpsBind) {
self.0 = Some(platform);
}

fn get_platform(&mut self) -> &mut PlatformOpsBind {
self.0.as_mut().unwrap()
}

async fn prepare_once(&mut self, seed: String, cli: Cli) {
self.get_platform().remove_all_fn().await;
self.get_platform().upload_fn("javakv_test", "").await;
}

async fn call_once(&mut self, cli: Cli) -> Metric {

let arg = Args {
kv_test_arg: "javakv_test_arg".to_string(),
};

let start_call_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let output = self
.get_platform()
.call_fn("javakv_test", "javakv", &serde_json::to_value(arg).unwrap())
.await;

print!("output {}\n",output);
let res: serde_json::Value = serde_json::from_str(&output).unwrap();
let req_arrive_time = res.get("req_arrive_time").unwrap().as_u64().unwrap();
let bf_exec_time = res.get("bf_exec_time").unwrap().as_u64().unwrap();
let recover_begin_time = res.get("recover_begin_time").unwrap().as_u64().unwrap();
let fn_start_ms = res.get("fn_start_time").unwrap().as_u64().unwrap();
let fn_end_ms = res.get("fn_end_time").unwrap().as_u64().unwrap();
let receive_resp_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

println!(
"\ntotal request latency: {}",
receive_resp_time - start_call_ms
);

println!("- req trans time: {}", req_arrive_time - start_call_ms);
println!("- app verify time: {}", bf_exec_time - req_arrive_time);
println!("- cold start time: {}", recover_begin_time - bf_exec_time);
println!("- cold start time2: {}", fn_start_ms - recover_begin_time);
println!("- exec time:{}", fn_end_ms - fn_start_ms);
if fn_end_ms > receive_resp_time {
println!(
"- system time is not synced, lag with {} ms",
fn_end_ms - receive_resp_time
);
} else {
println!("- receive resp time: {}", receive_resp_time - fn_end_ms);
}

Metric {
start_call_time: start_call_ms,
req_arrive_time,
bf_exec_time,
recover_begin_time,
fn_start_time: fn_start_ms,
fn_end_time: fn_end_ms,
receive_resp_time,
}
}

async fn prepare_first_call(&mut self, seed: String, cli: Cli) {
self.get_platform().remove_all_fn().await;
}

async fn call_first_call(&mut self, cli: Cli) {
let mut metrics = vec![];
for _ in 0..20 {
self.get_platform().upload_fn("javakv_test", "").await;
metrics.push(self.call_once(cli.clone()).await);
}

println!(
"\ntotal request latency: {}",
metrics.iter().map(|v| v.get_total_req()).sum::<u64>() as f32 / metrics.len() as f32
);

println!(
"- req trans time: {}",
metrics.iter().map(|v| v.get_req_trans_time()).sum::<u64>() as f32
/ metrics.len() as f32
);

println!(
"- app verify time: {}",
metrics.iter().map(|v| v.get_app_verify_time()).sum::<u64>() as f32
/ metrics.len() as f32
);

println!(
"- cold start time: {}",
metrics.iter().map(|v| v.get_cold_start_time()).sum::<u64>() as f32
/ metrics.len() as f32
);

println!(
"- cold start time2: {}",
metrics
.iter()
.map(|v| v.get_cold_start_time2())
.sum::<u64>() as f32
/ metrics.len() as f32
);

println!(
"- exec time: {}",
metrics.iter().map(|v| v.get_exec_time()).sum::<u64>() as f32 / metrics.len() as f32
);
// println!("- app verify time: {}", bf_exec_time - req_arrive_time);
// println!("- cold start time: {}", recover_begin_time - bf_exec_time);
// println!("- cold start time2: {}", fn_start_ms - recover_begin_time);
// println!("- exec time:{}", fn_end_ms - fn_start_ms);
}

fn app(&self) -> App {
App::JavaKvTest
}
}

#[derive(Serialize, Deserialize, Debug)]
struct Args {
kv_test_arg: String,
}

#[derive(Serialize, Deserialize, Debug)]
struct Resp {
javakv_test: String,
}

// 将字符串转换为u64哈希值
fn hash_str(s: &str) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}

fn generate_random_text<R: Rng>(rng: &mut R, length: usize) -> Vec<u8> {
let mut text = Vec::with_capacity(length);
let chars: &[u8] = b"abcdefghijklmnopqrstuvwxyz ";

for _ in 0..length {
text.push(*chars.choose(rng).unwrap());
}

text
}
4 changes: 4 additions & 0 deletions bencher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod demo_img_resize;
mod demo_javakv_test;
mod demo_parallel;
mod demo_sequential;
mod demo_word_count;
Expand Down Expand Up @@ -106,6 +107,7 @@ enum SpecTargetBind {
WordCount(demo_word_count::WordCount),
Parallel(demo_parallel::Parallel),
Sequential(demo_sequential::Sequential),
JavaKvTest(demo_javakv_test::JavaKvTest),
}

/// unit: ms
Expand Down Expand Up @@ -223,6 +225,8 @@ async fn main() -> Result<(), GooseError> {
SpecTargetBind::from(demo_parallel::Parallel::default())
} else if cli.sequential > 0 {
SpecTargetBind::from(demo_sequential::Sequential::default())
} else if cli.javakv_test > 0 {
SpecTargetBind::from(demo_javakv_test::JavaKvTest::default())
} else {
unreachable!()
};
Expand Down
3 changes: 3 additions & 0 deletions bencher/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct Cli {
#[arg(long, action = clap::ArgAction::Count)]
pub sequential: u8,

#[arg(long, action = clap::ArgAction::Count)]
pub javakv_test: u8,

#[arg(long, action = clap::ArgAction::Count)]
pub with_ow: u8,

Expand Down
2 changes: 2 additions & 0 deletions bencher/src/parse_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub enum App {
WordCount,
Parallel,
Sequential,
JavaKvTest,
}

impl From<&Cli> for App {
Expand All @@ -30,6 +31,7 @@ impl ToString for App {
App::WordCount => "word_count".to_owned(),
App::Parallel => "parallel".to_owned(),
App::Sequential => "sequential".to_owned(),
App::JavaKvTest => "java_kv_test".to_owned(),
}
}
}
5 changes: 4 additions & 1 deletion demos/_java_serverless_lib/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
</parent>

<properties>
<java.version>17</java.version>
<!-- <java.version>1.8</java.version> -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.boot.version>2.3.12.RELEASE</spring.boot.version>
<protobuf.version>3.15.8</protobuf.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public RpcHandleOwner rpcHandleOwner() {
public CracManager cracManager() {
return new CracManager();
}

@Bean
public KvBatch kvBatch(){
return new KvBatch("javakv-test", "javakv-test-func", 9999);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ConfigurableApplicationContext;;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.core.env.Environment;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.serverless_lib;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;

import process_rpc_proto.ProcessRpcProto.KvRequest;
import process_rpc_proto.ProcessRpcProto.KvRequests;
import process_rpc_proto.ProcessRpcProto.KvResponses;
import process_rpc_proto.ProcessRpcProto.KvRequest.KvPutRequest;
import process_rpc_proto.ProcessRpcProto.KvRequest.KvGetRequest;
import process_rpc_proto.ProcessRpcProto.KvRequest.KvDeleteRequest;
import process_rpc_proto.ProcessRpcProto.KvRequest.KvLockRequest;
import process_rpc_proto.ProcessRpcProto.KvPair;
import process_rpc_proto.ProcessRpcProto.KeyRange;


public class KvBatch {
private KvRequests batchArgs; // 用于发送批处理请求的KvRequests
private KvResponses result; // 用于接收响应的KvResponses
private int taskId; // 任务 ID

@Autowired
UdsBackend uds;

// 构造函数,初始化 KvRequests 和 KvResponses
public KvBatch(String app, String func, int taskId) {
this.batchArgs = KvRequests.newBuilder().setApp(app).setFunc(func).setPrevKvOpeid(8888).build();
this.result = KvResponses.newBuilder().build();
this.taskId = taskId;
// TODO 注入this
// uds.kvBatch = this;
}

// 设置批处理请求
public void setBatchArgs(String app, String func, List<KvRequest> requests, long prevKvOpeId) {
KvRequests.Builder builder = KvRequests.newBuilder();
builder.setApp(app).setFunc(func).addAllRequests(requests).setPrevKvOpeid(prevKvOpeId);
this.batchArgs = builder.build();
}

// 添加单个 PUT 请求
public void addPutRequest(byte[] key, byte[] value) {
KvPair kvPair = KvPair.newBuilder().setKey(com.google.protobuf.ByteString.copyFrom(key))
.setValue(com.google.protobuf.ByteString.copyFrom(value)).build();
KvPutRequest putRequest = KvPutRequest.newBuilder().setKv(kvPair).build();
KvRequest request = KvRequest.newBuilder().setSet(putRequest).build();

this.batchArgs = this.batchArgs.toBuilder().addRequests(request).build();
}

// 添加单个 GET 请求
public void addGetRequest(byte[] startKey, byte[] endKey) {
KeyRange range = KeyRange.newBuilder().setStart(com.google.protobuf.ByteString.copyFrom(startKey))
.setEnd(com.google.protobuf.ByteString.copyFrom(endKey)).build();
KvGetRequest getRequest = KvGetRequest.newBuilder().setRange(range).build();
KvRequest request = KvRequest.newBuilder().setGet(getRequest).build();

this.batchArgs = this.batchArgs.toBuilder().addRequests(request).build();
}

// 添加单个 DELETE 请求
public void addDeleteRequest(byte[] startKey, byte[] endKey) {
KeyRange range = KeyRange.newBuilder().setStart(com.google.protobuf.ByteString.copyFrom(startKey))
.setEnd(com.google.protobuf.ByteString.copyFrom(endKey)).build();
KvDeleteRequest deleteRequest = KvDeleteRequest.newBuilder().setRange(range).build();
KvRequest request = KvRequest.newBuilder().setDelete(deleteRequest).build();

this.batchArgs = this.batchArgs.toBuilder().addRequests(request).build();
}

// 添加单个 LOCK 请求
public void addLockRequest(boolean readOrWrite, List<Integer> releaseIds, byte[] startKey, byte[] endKey) {
KeyRange range = KeyRange.newBuilder().setStart(com.google.protobuf.ByteString.copyFrom(startKey))
.setEnd(com.google.protobuf.ByteString.copyFrom(endKey)).build();
KvLockRequest.Builder lockBuilder = KvLockRequest.newBuilder().setReadOrWrite(readOrWrite).setRange(range);
lockBuilder.addAllReleaseId(releaseIds);
KvRequest request = KvRequest.newBuilder().setLock(lockBuilder.build()).build();

this.batchArgs = this.batchArgs.toBuilder().addRequests(request).build();
}

// 发送请求并接收响应
public void sendBatch() throws Exception {
// 打包成 UdsPack 发送数据
UdsPack pack = new UdsPack(this.batchArgs, this.taskId);

System.out.println("KvBatch sendBatch");
// 发送请求
uds.send(pack);

// TODO 从 udsbackend 接收结果数据
// this.result = ;
}

// 获取请求内容
public KvRequests getBatchArgs() {
return this.batchArgs;
}

public Integer getTackId() {
return this.taskId;
}

// 获取响应内容
public KvResponses getResult() {
return this.result;
}

public void setResult(KvResponses result){
this.result = result;
}
}



/*
用法示例:
KvBatch kvBatch = new KvBatch();
kvBatch.addPutRequest("key1".getBytes(), "value1".getBytes());
kvBatch.addGetRequest("key1".getBytes(), "key2".getBytes());
KvResponses responses = kvBatch.sendBatch();
*/
Loading