structure designed & storage is done

This commit is contained in:
Mahno 2025-07-09 15:30:52 +08:00
parent da14c2cae8
commit a387dd48d1
18 changed files with 2186 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
config/
venv/*
**/target/
.vscode/

View File

@ -1,2 +1,14 @@
# bill-gates
运行本项目前,确保系统上已配置好以下软件:
```
bash
python3
xmake
```
## 运行方法:
直接执行start.sh
```bash
./start.sh
```

31
build.sh Executable file
View File

@ -0,0 +1,31 @@
#!/bin/bash
BLUE='\033[0;34m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
# Function to print timestamp
timestamp(){
date "+%Y-%m-%d %H:%M:%S"
}
# Begin build process
echo -e "${BLUE}$(timestamp)[BUILD]${NC}"
# if any of the commands fail, exit
set -e
# set up python venv
python3 -m venv venv
source venv/bin/activate
# Satisfy python dependencies
python3 -m pip install -r gateway/requirements.txt
echo -e "${GREEN}$(timestamp)[BUILD FINISHED]${NC}"
# Build storage crate
pushd storage
cargo build --release
popd
# Run the project

0
daemon.py Normal file
View File

32
datasource/gateway.py Normal file
View File

@ -0,0 +1,32 @@
import tushare as ts
import pandas as pd
from datetime import datetime, timedelta
# Read config from config.yaml
def read_config():
import yaml
with open('config/config.yaml', 'r') as file:
config = yaml.safe_load(file)
# Ensure the config has the required keys
if 'gateway' not in config or 'token' not in config['gateway']:
raise KeyError("Missing required config key: gateway.token")
return config
def main():
config = read_config()
token = config['gateway']['token']
# Initialize Tushare with the token
ts.set_token(token)
pro = ts.pro_api()
# Get the last trading day
last_trading_day = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
# Fetch stock data for the last trading day
df = pro.daily(trade_date=last_trading_day)
# Print the DataFrame
print(df)
if __name__ == "__main__":
main()

0
datasource/job.py Normal file
View File

0
datasource/main.py Normal file
View File

View File

@ -0,0 +1 @@
tushare

0
datasource/rpc.py Normal file
View File

0
datasource/storage.py Normal file
View File

0
gateway/gateway.py Normal file
View File

1743
storage/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

16
storage/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "storage"
version = "0.1.0"
edition = "2024"
[dependencies]
saphyr = "0.0.6"
log = "0.4"
env_logger = "0.11"
tokio = { version = "1", features = ["full"] }
axum = "0.8"
serde_json = "1.0"
home = "0.5"
serde = {version = "1.0", features = ["derive"]}
moka = {version = "0.12",features = ["future"]}
rocksdb ={ version = "0.23"}

152
storage/src/main.rs Normal file
View File

@ -0,0 +1,152 @@
use std::{env, fs, net::TcpListener, path::Path};
use std::process::exit;
use saphyr::{LoadableYamlNode, Yaml};
use log::{info, error};
use env_logger;
use tokio::sync::mpsc;
use home::home_dir;
mod rpc;
mod msg;
mod storage;
#[derive(Debug)]
struct Config {
rpc_url: String,
lru_timeout: u64,
lru_capacity: u64,
lru_path: String,
}
fn parse_capacity(s: &str) -> Result<u64, String> {
let s = s.trim().to_uppercase();
match s {
s if s.ends_with("GB") => {
match s.strip_suffix("GB") {
Some(num) => num.trim().parse::<u64>().map_err(|e| e.to_string()).map(|n| n * 1024 * 1024 * 1024),
None => Err("Invalid GB format".to_string()),
}
}
s if s.ends_with("MB") => {
match s.strip_suffix("MB") {
Some(num) => num.trim().parse::<u64>().map_err(|e| e.to_string()).map(|n| n * 1024 * 1024),
None => Err("Invalid MB format".to_string()),
}
}
s if s.ends_with("KB") => {
match s.strip_suffix("KB") {
Some(num) => num.trim().parse::<u64>().map_err(|e| e.to_string()).map(|n| n * 1024),
None => Err("Invalid KB format".to_string()),
}
}
_ => s.parse::<u64>().map_err(|e| e.to_string()),
}
}
fn read_config(config_path: &str) -> Result<Config, String> {
let config_str = fs::read_to_string(config_path)
.map_err(|e| format!("Failed to read {}: {}", config_path, e))?;
let docs = Yaml::load_from_str(&config_str)
.map_err(|e| format!("Failed to parse {}: {}", config_path, e))?;
let doc = &docs[0];
let storage = &doc["storage"];
let rpc_url = storage["rpc"]["url"]
.as_str()
.ok_or("storage.rpc.url is missing or not a string")?
.to_string();
let lru_timeout = storage["lru"]["timeout"]
.as_integer()
.ok_or("storage.lru.timeout is missing or not an integer")? as u64;
let lru_capacity_str = storage["lru"]["capacity"]
.as_str()
.ok_or("storage.lru.capacity is missing or not a string")?;
let lru_capacity = parse_capacity(lru_capacity_str)
.map_err(|e| format!("storage.lru.capacity parse error: {}", e))?;
let lru_path = storage["lru"]["path"]
.as_str()
.ok_or("storage.lru.path is missing or not a string")?
.to_string();
let lru_path = expand_home(&lru_path);
if rpc_url.is_empty() {
return Err("storage.rpc.url is empty".to_string());
}
if lru_timeout == 0 {
return Err("storage.lru.timeout must be > 0".to_string());
}
if lru_capacity == 0 {
return Err("storage.lru.capacity must be > 0".to_string());
}
if lru_path.is_empty() {
return Err("storage.lru.path is empty".to_string());
}
if TcpListener::bind(&rpc_url).is_err() {
return Err(format!("Cannot bind to address: {}", rpc_url));
}
let db_dir = Path::new(&lru_path);
if !db_dir.exists() {
if let Err(e) = fs::create_dir_all(db_dir) {
return Err(format!("Cannot create rocksDB directory {}: {}", lru_path, e));
}
}
let test_file = db_dir.join(".test_write");
if fs::write(&test_file, b"test").is_err() {
return Err(format!("Cannot write to rocksDB directory: {}", lru_path));
}
let _ = fs::remove_file(&test_file);
Ok(Config { rpc_url, lru_timeout, lru_capacity, lru_path })
}
fn expand_home(path: &str) -> String {
match path.strip_prefix("~/") {
Some(stripped) => match home_dir() {
Some(home) => home.join(stripped).to_string_lossy().to_string(),
None => path.to_string(),
},
None => path.to_string(),
}
}
use storage::storage_task;
#[tokio::main]
async fn main() {
env_logger::init();
// 解析命令行参数
let mut args = env::args().skip(1);
let mut config_path = "config/config.yaml".to_string();
while let Some(arg) = args.next() {
if arg == "-c" {
if let Some(path) = args.next() {
config_path = path;
} else {
error!("FATAL: -c option requires a path argument");
exit(1);
}
}
}
let config = match read_config(&config_path) {
Ok(cfg) => {
info!("Config loaded and validated: {:?}", cfg);
cfg
}
Err(e) => {
error!("FATAL: {}", e);
exit(1);
}
};
// 创建 channel
let (tx, rx) = mpsc::channel(128);
let mem_bytes = config.lru_capacity;
let ttl_secs = config.lru_timeout;
let db_path = config.lru_path.clone();
// 启动 storage 任务
tokio::spawn(storage_task(rx, mem_bytes, ttl_secs, db_path));
// 启动 RPC 服务
rpc::start_rpc_server(&config.rpc_url, tx).await;
}

8
storage/src/msg.rs Normal file
View File

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum StorageMsg {
Get { key: String },
Set { key: String, value: Vec<u8> },
}

84
storage/src/rpc.rs Normal file
View File

@ -0,0 +1,84 @@
use axum::{
extract::{ConnectInfo, State}, http::StatusCode, routing::post, Json, Router
};
use serde_json::Value;
use tokio::sync::mpsc::Sender;
use std::sync::Arc;
use log::{info, error};
use std::net::{SocketAddr, IpAddr};
use crate::msg::StorageMsg;
#[derive(Clone)]
pub struct RPCState {
pub tx: Sender<StorageMsg>,
pub allowed_host: String,
}
pub async fn start_rpc_server(addr: &str, tx: Sender<StorageMsg>) {
// 拆分host和port
let (host_part, port_part) = match addr.rsplit_once(':') {
Some((h, p)) => (h.to_string(), p.parse::<u16>().expect("Invalid port")),
None => panic!("Invalid addr format, expected host:port"),
};
// 判断host是否为IP
let listen_ip = if host_part.parse::<IpAddr>().is_ok() {
host_part.clone()
} else {
"0.0.0.0".to_string()
};
let listen_addr = format!("{}:{}", listen_ip, port_part);
let state = Arc::new(RPCState {
tx,
allowed_host: host_part,
});
let app = Router::new()
.route("/", post(handle_rpc))
.with_state(state)
.into_make_service_with_connect_info::<SocketAddr>();
info!("HTTP JSON-RPC server listening on {}", listen_addr);
let listener=tokio::net::TcpListener::bind(listen_addr).await.unwrap();
axum::serve(listener,app).await.unwrap();
}
async fn handle_rpc(
State(state): State<Arc<RPCState>>,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let allowed = state.allowed_host == "0.0.0.0"
|| peer_addr.ip().to_string() == state.allowed_host;
if !allowed {
error!("Connection from {} not allowed", peer_addr);
return (
StatusCode::FORBIDDEN,
Json(serde_json::json!({"error": "host/port not allowed"}))
);
}
let msg: Result<StorageMsg, _> = serde_json::from_value(payload);
match msg {
Ok(msg) => {
// msg is StorageMsg
if let Err(e) = state.tx.send(msg).await {
error!("Failed to send request to storage: {}", e);
return (
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": "internal error"}))
);
}
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
}
Err(e) => {
error!("Invalid message: {}", e);
(
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "invalid message"}))
)
}
}
}

103
storage/src/storage.rs Normal file
View File

@ -0,0 +1,103 @@
use crate::msg::StorageMsg;
use log::{error, info};
use moka::future::Cache;
use rocksdb::{DB, Options};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
pub struct Storage {
cache: Cache<String, Vec<u8>>,
db: Arc<DB>,
}
impl Storage {
pub fn new(mem_bytes: u64, ttl_secs: u64, db_path: &str) -> Self {
let mut opts = Options::default();
opts.create_if_missing(true);
info!("Opening RocksDB at {}", db_path);
let db = Arc::new(DB::open(&opts, db_path).expect("open rocksdb failed"));
let _db = db.clone();
info!("Initializing in-memory cache: max_capacity={} bytes, ttl={} secs", mem_bytes, ttl_secs);
let cache = Cache::builder()
.max_capacity(mem_bytes / 128)
.time_to_live(Duration::from_secs(ttl_secs))
.eviction_listener(move |key: Arc<String>, value, _cause| {
let db = db.clone();
let key = key.clone();
info!("Evicting key from memory: {} (persisting to RocksDB)", key);
if let Err(e) = db.put(key.as_bytes(), value) {
error!("Failed to persist evicted key to rocksdb: {}", e);
}
})
.build();
info!("Storage initialized");
Storage { cache, db: _db }
}
pub async fn get(&self, key: &String) -> Option<Vec<u8>> {
if let Some(val) = self.cache.get(key).await {
info!("Cache hit for key: {}", key);
return Some(val);
}
info!("Cache miss for key: {}, loading from RocksDB", key);
let db = self.db.clone();
let key_owned = key.clone();
let v = tokio::task::spawn_blocking(move || db.get(key_owned.as_bytes()).ok().flatten())
.await
.ok()
.flatten();
if let Some(bytes) = v.clone() {
info!("Loaded key from RocksDB: {} ({} bytes)", key, bytes.len());
self.cache.insert(key.clone(), bytes.clone()).await;
return Some(bytes);
}
info!("Key not found in RocksDB: {}", key);
None
}
pub async fn set(&self, key: String, value: Vec<u8>) {
info!("Set key: {} ({} bytes)", key, value.len());
self.cache.insert(key.clone(), value.clone()).await;
let db = self.db.clone();
tokio::spawn(async move {
let _ = tokio::task::spawn_blocking(move || {
let res = db.put(key.as_bytes(), value);
if let Err(e) = res {
error!("Failed to persist key to rocksdb (set): {}", e);
} else {
info!("Persisted key to rocksdb (set)");
}
}).await;
});
}
}
pub async fn storage_task(
mut rx: Receiver<StorageMsg>,
mem_bytes: u64,
ttl_secs: u64,
db_path: String,
) {
let storage = Storage::new(mem_bytes, ttl_secs, &db_path);
// storage.register_eviction();
while let Some(msg) = rx.recv().await {
match msg {
StorageMsg::Get { key } => {
let result = storage.get(&key).await;
info!(
"GET key={} result={:?}",
key,
result.as_ref().map(|v| v.len())
);
// TODO: 返回结果给请求方可用oneshot channel等
}
StorageMsg::Set { key, value } => {
storage.set(key, value).await;
info!("SET key");
}
}
}
}

0
strategy/xmake.lua Normal file
View File