Skip to content

Commit

Permalink
Merge pull request #38 from CosmicTrader/master
Browse files Browse the repository at this point in the history
Numba Impementation for Order Block Calculation
  • Loading branch information
joshyattridge authored May 20, 2024
2 parents 58bb145 + 271492a commit 30e40ec
Show file tree
Hide file tree
Showing 22 changed files with 35,989 additions and 24,556 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Bottom = bottom of the order block<br>
OBVolume = volume + 2 last volumes amounts<br>
Percentage = strength of order block (min(highVolume, lowVolume)/max(highVolume,lowVolume))<br>


### Liquidity

```python
Expand Down
19 changes: 16 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,21 @@
long_description_content_type="text/markdown",
long_description=LONG_DESCRIPTION,
packages=["smartmoneyconcepts"],
install_requires=["pandas==2.0.2", "numpy==1.24.3"],
keywords=['smart', 'money', 'concepts', 'ict', 'indicators', 'trading', 'forex', 'stocks', 'crypto', 'order', 'blocks', 'liquidity'],
install_requires=["pandas==2.0.2", "numpy==1.24.3", "numba==0.59.1"],
keywords=[
"smart",
"money",
"concepts",
"ict",
"indicators",
"trading",
"forex",
"stocks",
"crypto",
"order",
"blocks",
"liquidity",
],
url="https://github.com/joshyattridge/smartmoneyconcepts",
classifiers=[
"Development Status :: 1 - Planning",
Expand All @@ -28,5 +41,5 @@
"Operating System :: Unix",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
]
],
)
199 changes: 107 additions & 92 deletions smartmoneyconcepts/smc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pandas import DataFrame, Series
from datetime import datetime


def inputvalidator(input_="ohlc"):
def dfcheck(func):
@wraps(func)
Expand Down Expand Up @@ -163,35 +162,38 @@ def swing_highs_lows(cls, ohlc: DataFrame, swing_length: int = 50) -> Series:
),
)

continue_ = True
while continue_:
while True:
positions = np.where(~np.isnan(swing_highs_lows))[0]
continue_ = False
for i in range(len(positions) - 1):
current, next = (
swing_highs_lows[positions[i]],
swing_highs_lows[positions[i + 1]],
)
high, low = (
ohlc["high"].iloc[positions[i]],
ohlc["low"].iloc[positions[i]],
)
next_high, next_low = (
ohlc["high"].iloc[positions[i + 1]],
ohlc["low"].iloc[positions[i + 1]],
)
if current == -1 and next == -1:
remove_index = positions[i] if low > next_low else positions[i + 1]
swing_highs_lows[remove_index] = np.nan
continue_ = True
elif current == 1 and next == 1:
remove_index = (
positions[i] if high < next_high else positions[i + 1]
)
swing_highs_lows[remove_index] = np.nan
continue_ = True

if len(positions) < 2:
break

current = swing_highs_lows[positions[:-1]]
next = swing_highs_lows[positions[1:]]

highs = ohlc["high"].iloc[positions[:-1]].values
lows = ohlc["low"].iloc[positions[:-1]].values

next_highs = ohlc["high"].iloc[positions[1:]].values
next_lows = ohlc["low"].iloc[positions[1:]].values

index_to_remove = np.zeros(len(positions), dtype=bool)

consecutive_highs = (current == 1) & (next == 1)
index_to_remove[:-1] |= consecutive_highs & (highs < next_highs)
index_to_remove[1:] |= consecutive_highs & (highs >= next_highs)

consecutive_lows = (current == -1) & (next == -1)
index_to_remove[:-1] |= consecutive_lows & (lows > next_lows)
index_to_remove[1:] |= consecutive_lows & (lows <= next_lows)

if not index_to_remove.any():
break

swing_highs_lows[positions[index_to_remove]] = np.nan

positions = np.where(~np.isnan(swing_highs_lows))[0]

if len(positions) > 0:
if swing_highs_lows[positions[0]] == 1:
swing_highs_lows[0] = -1
Expand Down Expand Up @@ -392,8 +394,15 @@ def ob(
OBVolume = volume + 2 last volumes amounts
Percentage = strength of order block (min(highVolume, lowVolume)/max(highVolume,lowVolume))
"""

swing_highs_lows = swing_highs_lows.copy()
ohlc_len = len(ohlc)

_open = ohlc["open"].values
_high = ohlc["high"].values
_low = ohlc["low"].values
_close = ohlc["close"].values
_volume = ohlc["volume"].values
_swing_high_low = swing_highs_lows["HighLow"].values

crossed = np.full(len(ohlc), False, dtype=bool)
ob = np.zeros(len(ohlc), dtype=np.int32)
Expand All @@ -406,148 +415,154 @@ def ob(
mitigated_index = np.zeros(len(ohlc), dtype=np.int32)
breaker = np.full(len(ohlc), False, dtype=bool)

for i in range(len(ohlc)):
for i in range(ohlc_len):
close_index = i
close_price = ohlc["close"].iloc[close_index]

# Bullish Order Block
if len(ob[ob == 1]) > 0:
for j in range(len(ob) - 1, -1, -1):
if ob[j] == 1:
currentOB = j
if breaker[currentOB]:
if ohlc.high.iloc[close_index] > top[currentOB]:
ob[j] = top[j] = bottom[j] = obVolume[j] = lowVolume[
j
] = highVolume[j] = mitigated_index[j] = percentage[
j
] = 0.0
if _high[close_index] > top[currentOB]:
ob[j] = top[j] = bottom[j] = obVolume[j] = lowVolume[j] = (
highVolume[j]
) = mitigated_index[j] = percentage[j] = 0.0

elif (
not close_mitigation
and ohlc["low"].iloc[close_index] < bottom[currentOB]
not close_mitigation and _low[close_index] < bottom[currentOB]
) or (
close_mitigation
and min(
ohlc["open"].iloc[close_index],
ohlc["close"].iloc[close_index],
_open[close_index],
_close[close_index],
)
< bottom[currentOB]
):
breaker[currentOB] = True
mitigated_index[currentOB] = close_index - 1
last_top_index = None
for j in range(len(swing_highs_lows["HighLow"])):
if swing_highs_lows["HighLow"][j] == 1 and j < close_index:
last_top_index = j

last_top_indices = np.where(
(_swing_high_low == 1)
& (np.arange(len(swing_highs_lows["HighLow"])) < close_index)
)[0]

if last_top_indices.size > 0:
last_top_index = np.max(last_top_indices)
else:
last_top_index = None

if last_top_index is not None:
swing_top_price = ohlc["high"].iloc[last_top_index]
if close_price > swing_top_price and not crossed[last_top_index]:

swing_top_price = _high[last_top_index]
if _close[close_index] > swing_top_price and not crossed[last_top_index]:
crossed[last_top_index] = True
obBtm = ohlc["high"].iloc[close_index - 1]
obTop = ohlc["low"].iloc[close_index - 1]
obBtm = _high[close_index - 1]
obTop = _low[close_index - 1]
obIndex = close_index - 1
for j in range(1, close_index - last_top_index):
obBtm = min(
ohlc["low"].iloc[last_top_index + j],
_low[last_top_index + j],
obBtm,
)
if obBtm == ohlc["low"].iloc[last_top_index + j]:
obTop = ohlc["high"].iloc[last_top_index + j]
if obBtm == _low[last_top_index + j]:
obTop = _high[last_top_index + j]
obIndex = (
last_top_index + j
if obBtm == ohlc["low"].iloc[last_top_index + j]
if obBtm == _low[last_top_index + j]
else obIndex
)

ob[obIndex] = 1
top[obIndex] = obTop
bottom[obIndex] = obBtm
obVolume[obIndex] = (
ohlc["volume"].iloc[close_index]
+ ohlc["volume"].iloc[close_index - 1]
+ ohlc["volume"].iloc[close_index - 2]
)
lowVolume[obIndex] = ohlc["volume"].iloc[close_index - 2]
highVolume[obIndex] = (
ohlc["volume"].iloc[close_index]
+ ohlc["volume"].iloc[close_index - 1]
_volume[close_index]
+ _volume[close_index - 1]
+ _volume[close_index - 2]
)
lowVolume[obIndex] = _volume[close_index - 2]
highVolume[obIndex] = _volume[close_index] + _volume[close_index - 1]
percentage[obIndex] = (
np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0)
/ np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) if np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) != 0 else 1
/ np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0)
if np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) != 0
else 1
) * 100.0

for i in range(len(ohlc)):
close_index = i
close_price = ohlc["close"].iloc[close_index]
close_price = _close[close_index]

# Bearish Order Block
if len(ob[ob == -1]) > 0:
for j in range(len(ob) - 1, -1, -1):
if ob[j] == -1:
currentOB = j
if breaker[currentOB]:
if ohlc.low.iloc[close_index] < bottom[currentOB]:
ob[j] = top[j] = bottom[j] = obVolume[j] = lowVolume[
j
] = highVolume[j] = mitigated_index[j] = percentage[
j
] = 0.0
if _low[close_index] < bottom[currentOB]:

ob[j] = top[j] = bottom[j] = obVolume[j] = lowVolume[j] = (
highVolume[j]
) = mitigated_index[j] = percentage[j] = 0.0

elif (
not close_mitigation
and ohlc["high"].iloc[close_index] > top[currentOB]
not close_mitigation and _high[close_index] > top[currentOB]
) or (
close_mitigation
and max(
ohlc["open"].iloc[close_index],
ohlc["close"].iloc[close_index],
_open[close_index],
_close[close_index],
)
> top[currentOB]
):
breaker[currentOB] = True
mitigated_index[currentOB] = close_index
last_btm_index = None
for j in range(len(swing_highs_lows["HighLow"])):
if swing_highs_lows["HighLow"][j] == -1 and j < close_index:
last_btm_index = j

last_btm_indices = np.where(
(swing_highs_lows["HighLow"] == -1)
& (np.arange(len(swing_highs_lows["HighLow"])) < close_index)
)[0]
if last_btm_indices.size > 0:
last_btm_index = np.max(last_btm_indices)
else:
last_btm_index = None

if last_btm_index is not None:
swing_btm_price = ohlc["low"].iloc[last_btm_index]
swing_btm_price = _low[last_btm_index]
if close_price < swing_btm_price and not crossed[last_btm_index]:
crossed[last_btm_index] = True
obBtm = ohlc["low"].iloc[close_index - 1]
obTop = ohlc["high"].iloc[close_index - 1]
obBtm = _low[close_index - 1]
obTop = _high[close_index - 1]
obIndex = close_index - 1
for j in range(1, close_index - last_btm_index):
obTop = max(ohlc["high"].iloc[last_btm_index + j], obTop)
obTop = max(_high[last_btm_index + j], obTop)
obBtm = (
ohlc["low"].iloc[last_btm_index + j]
if obTop == ohlc["high"].iloc[last_btm_index + j]
_low[last_btm_index + j]
if obTop == _high[last_btm_index + j]
else obBtm
)
obIndex = (
last_btm_index + j
if obTop == ohlc["high"].iloc[last_btm_index + j]
if obTop == _high[last_btm_index + j]
else obIndex
)

ob[obIndex] = -1
top[obIndex] = obTop
bottom[obIndex] = obBtm
obVolume[obIndex] = (
ohlc["volume"].iloc[close_index]
+ ohlc["volume"].iloc[close_index - 1]
+ ohlc["volume"].iloc[close_index - 2]
)
lowVolume[obIndex] = (
ohlc["volume"].iloc[close_index]
+ ohlc["volume"].iloc[close_index - 1]
_volume[close_index]
+ _volume[close_index - 1]
+ _volume[close_index - 2]
)
highVolume[obIndex] = ohlc["volume"].iloc[close_index - 2]
lowVolume[obIndex] = _volume[close_index] + _volume[close_index - 1]
highVolume[obIndex] = _volume[close_index - 2]
percentage[obIndex] = (
np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0)
/ np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) if np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) != 0 else 1
/ np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0)
if np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) != 0
else 1
) * 100.0

ob = np.where(ob != 0, ob, np.nan)
Expand Down
Loading

0 comments on commit 30e40ec

Please sign in to comment.