-
Notifications
You must be signed in to change notification settings - Fork 0
/
correlation_database.py
78 lines (70 loc) · 3.36 KB
/
correlation_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# correlation_database.py
import logging
import sqlite3
import json
logger = logging.getLogger()
class CorrelationDatabase:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.create_table()
def create_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS correlations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol_id INTEGER NOT NULL,
timeframe_id INTEGER NOT NULL,
indicator_config_id INTEGER NOT NULL,
lag INTEGER NOT NULL,
correlation_value REAL NOT NULL,
FOREIGN KEY (symbol_id) REFERENCES symbols(id),
FOREIGN KEY (timeframe_id) REFERENCES timeframes(id),
FOREIGN KEY (indicator_config_id) REFERENCES indicator_configs(id),
UNIQUE(symbol_id, timeframe_id, indicator_config_id, lag)
);""")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_correlations ON correlations (symbol_id, timeframe_id, indicator_config_id, lag);")
self.conn.commit()
def insert_correlation(self, symbol, timeframe, indicator_config, lag, value):
cursor = self.conn.cursor()
symbol_id = self._get_or_create_id('symbols', 'symbol', symbol, cursor)
timeframe_id = self._get_or_create_id('timeframes', 'timeframe', timeframe, cursor)
indicator_config_id = self._get_indicator_config_id(indicator_config, cursor)
try:
cursor.execute("""
INSERT OR REPLACE INTO correlations (symbol_id, timeframe_id, indicator_config_id, lag, correlation_value)
VALUES (?, ?, ?, ?, ?);""", (symbol_id, timeframe_id, indicator_config_id, lag, value))
self.conn.commit()
except sqlite3.Error as e:
logger.error(f"SQLite insertion error: {e}")
raise
def get_all_correlations(self):
cursor = self.conn.cursor()
cursor.execute("""
SELECT s.symbol, t.timeframe, i.name, ic.config, c.lag, c.correlation_value
FROM correlations c
JOIN symbols s ON c.symbol_id = s.id
JOIN timeframes t ON c.timeframe_id = t.id
JOIN indicator_configs ic ON c.indicator_config_id = ic.id
JOIN indicators i ON ic.indicator_id = i.id;""")
return cursor.fetchall()
def _get_or_create_id(self, table, column, value, cursor):
cursor.execute(f"SELECT id FROM {table} WHERE {column} = ?", (value,))
res = cursor.fetchone()
if res:
return res[0]
cursor.execute(f"INSERT INTO {table} ({column}) VALUES (?)", (value,))
return cursor.lastrowid
def _get_indicator_config_id(self, indicator_config, cursor):
indicator_name = indicator_config['indicator_name']
params = json.dumps(indicator_config['params'], sort_keys=True)
cursor.execute("""
SELECT ic.id FROM indicator_configs ic
JOIN indicators i ON ic.indicator_id = i.id
WHERE i.name = ? AND ic.config = ?;""", (indicator_name, params))
res = cursor.fetchone()
if res:
return res[0]
else:
logger.error(f"Indicator configuration not found for {indicator_name} with params {params}")
raise ValueError(f"Indicator configuration not found for {indicator_name} with params {params}")
def close(self):
self.conn.close()