"""Contains vectorized utility functions."""
"""Copyright (C) 2023 Edward West. All rights reserved.
This code is licensed under Apache 2.0 with Commons Clause license
(see LICENSE for details).
"""
import numpy as np
from numba import njit
from numpy.typing import NDArray
from typing import Literal
@njit
def _verify_input(array: NDArray[np.float64], n: int):
assert n > 0, "n needs to be >= 1."
assert n <= len(array), "n is greater than array length."
[docs]
@njit
def lowv(array: NDArray[np.float64], n: int) -> NDArray[np.float64]:
"""Calculates the lowest values for every ``n`` period in ``array``.
Args:
array: :class:`numpy.ndarray` of data.
n: Length of period.
Returns:
:class:`numpy.ndarray` of the lowest values for every ``n`` period in
``array``.
"""
if not len(array):
return np.array(tuple())
_verify_input(array, n)
out_len = len(array)
out = np.array([np.nan for _ in range(out_len)])
for i in range(n, out_len + 1):
out[i - 1] = np.min(array[i - n : i])
return out
[docs]
@njit
def highv(array: NDArray[np.float64], n: int) -> NDArray[np.float64]:
"""Calculates the highest values for every ``n`` period in ``array``.
Args:
array: :class:`numpy.ndarray` of data.
n: Length of period.
Returns:
:class:`numpy.ndarray` of the highest values for every ``n`` period in
``array``.
"""
if not len(array):
return np.array(tuple())
_verify_input(array, n)
out_len = len(array)
out = np.array([np.nan for _ in range(out_len)])
for i in range(n, out_len + 1):
out[i - 1] = np.max(array[i - n : i])
return out
[docs]
@njit
def sumv(array: NDArray[np.float64], n: int) -> NDArray[np.float64]:
"""Calculates the sums for every ``n`` period in ``array``.
Args:
array: :class:`numpy.ndarray` of data.
n: Length of period.
Returns:
:class:`numpy.ndarray` of the sums for every ``n`` period in ``array``.
"""
if not len(array):
return np.array(tuple())
_verify_input(array, n)
out_len = len(array)
out = np.array([np.nan for _ in range(out_len)])
for i in range(n, out_len + 1):
out[i - 1] = np.sum(array[i - n : i])
return out
[docs]
@njit
def returnv(array: NDArray[np.float64], n: int = 1) -> NDArray[np.float64]:
"""Calculates returns.
Args:
n: Return period. Defaults to 1.
Returns:
:class:`numpy.ndarray` of returns.
"""
if not len(array):
return np.array(tuple())
_verify_input(array, n)
out_len = len(array)
out = np.array([np.nan for _ in range(out_len)])
for i in range(n, out_len):
out[i] = (array[i] - array[i - n]) / array[i - n]
return out
[docs]
@njit
def cross(a: NDArray[np.float64], b: NDArray[np.float64]) -> NDArray[np.bool_]:
"""Checks for crossover of ``a`` above ``b``.
Args:
a: :class:`numpy.ndarray` of data.
b: :class:`numpy.ndarray` of data.
Returns:
:class:`numpy.ndarray` containing values of ``1`` when ``a`` crosses
above ``b``, otherwise values of ``0``.
"""
assert len(a), "a cannot be empty."
assert len(b), "b cannot be empty."
assert len(a) == len(b), "a and b must be same length."
assert len(a) >= 2, "a and b must have length >= 2."
crossed = np.where(a > b, 1, 0)
return (sumv(crossed > 0, 2) == 1) * crossed
[docs]
@njit
def normal_cdf(z: float) -> float:
"""Computes the CDF of the standard normal distribution."""
zz = np.fabs(z)
pdf = np.exp(-0.5 * zz * zz) / np.sqrt(2 * np.pi)
t = 1 / (1 + zz * 0.2316419)
poly = (
(((1.330274429 * t - 1.821255978) * t + 1.781477937) * t - 0.356563782)
* t
+ 0.319381530
) * t
return 1 - pdf * poly if z > 0 else pdf * poly
[docs]
@njit
def inverse_normal_cdf(p: float) -> float:
"""Computes the inverse CDF of the standard normal distribution."""
pp = p if p <= 0.5 else 1 - p
if pp == 0:
pp = 1.0e-10
t = np.sqrt(np.log(1 / (pp * pp)))
numer = (0.010328 * t + 0.802853) * t + 2.515517
denom = ((0.001308 * t + 0.189269) * t + 1.432788) * t + 1
x = t - numer / denom
return -x if p <= 0.5 else x
@njit
def _atr(
last_bar: int,
lookback: int,
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
use_log: bool = False,
) -> float:
"""Computes Average True Range.
Args:
last_bar: Index of last bar for ATR calculation.
lookback: Number of lookback bars.
high: High prices.
low: Low prices.
close: Close prices.
use_log: Whether to log transform. Defaults to ``False``.
Returns:
The computed ATR.
"""
assert last_bar >= lookback
if lookback == 0:
if use_log:
return np.log(high[last_bar] / low[last_bar])
else:
return high[last_bar] - low[last_bar]
total = 0.0
for i in range(last_bar - lookback + 1, last_bar + 1):
if use_log:
term = high[i] / low[i]
if high[i] / close[i - 1] > term:
term = high[i] / close[i - 1]
if close[i - 1] / low[i] > term:
term = close[i - 1] / low[i]
total += np.log(term)
else:
term = high[i] - low[i]
if high[i] - close[i - 1] > term:
term = high[i] - close[i - 1]
if close[i - 1] - low[i] > term:
term = close[i - 1] - low[i]
total += term
return total / lookback
@njit
def _variance(
use_change: bool, last_bar: int, length: int, prices: NDArray[np.float64]
) -> float:
if use_change:
assert last_bar >= length
else:
assert last_bar >= length - 1
total = 0.0
for i in range(last_bar - length + 1, last_bar + 1):
if use_change:
term = np.log(prices[i] / prices[i - 1])
else:
term = np.log(prices[i])
total += term
mean = total / length
total = 0.0
for i in range(last_bar - length + 1, last_bar + 1):
if use_change:
term = np.log(prices[i] / prices[i - 1]) - mean
else:
term = np.log(prices[i]) - mean
total += term * term
return total / length
[docs]
@njit
def detrended_rsi(
values: NDArray[np.float64],
short_length: int,
long_length: int,
reg_length: int,
) -> NDArray[np.float64]:
"""Computes Detrended Relative Strength Index (RSI).
Args:
values: :class:`numpy.ndarray` of input.
short_length: Lookback for the short-term RSI.
long_length: Lookback for the long-term RSI.
reg_length: Number of bars used for linear regressions.
Returns:
:class:`numpy.ndarray` of computed values.
"""
assert short_length > 0
assert short_length <= long_length
assert short_length > 1
assert long_length > 1
assert reg_length >= 1
n = len(values)
front_bad = long_length + reg_length - 1
output = np.zeros(n)
if front_bad >= n:
return output
work1 = np.zeros(n)
for i in range(short_length):
work1[i] = 1.0e90
up_sum = dn_sum = 1.0e-60
for i in range(1, short_length):
diff = values[i] - values[i - 1]
if diff > 0.0:
up_sum += diff
else:
dn_sum -= diff
up_sum /= short_length - 1
dn_sum /= short_length - 1
for i in range(short_length, n):
diff = values[i] - values[i - 1]
if diff > 0:
up_sum = ((short_length - 1.0) * up_sum + diff) / short_length
dn_sum *= (short_length - 1.0) / short_length
else:
dn_sum = ((short_length - 1.0) * dn_sum - diff) / short_length
up_sum *= (short_length - 1.0) / short_length
work1[i] = 100.0 * up_sum / (up_sum + dn_sum)
if short_length == 2:
work1[i] = -10.0 * np.log(
2.0 / (1 + 0.00999 * (2 * work1[i] - 100)) - 1
)
work2 = np.zeros(n)
for i in range(long_length):
work2[i] = -1.0e90
up_sum = dn_sum = 1.0e-60
for i in range(1, long_length):
diff = values[i] - values[i - 1]
if diff > 0.0:
up_sum += diff
else:
dn_sum -= diff
up_sum /= long_length - 1
dn_sum /= long_length - 1
for i in range(long_length, n):
diff = values[i] - values[i - 1]
if diff > 0.0:
up_sum = ((long_length - 1.0) * up_sum + diff) / long_length
dn_sum *= (long_length - 1.0) / long_length
else:
dn_sum = ((long_length - 1.0) * dn_sum - diff) / long_length
up_sum *= (long_length - 1.0) / long_length
work2[i] = 100.0 * up_sum / (up_sum + dn_sum)
for i in range(front_bad, n):
x_mean = y_mean = 0.0
for j in range(reg_length):
k = i - j
x_mean += work2[k]
y_mean += work1[k]
x_mean /= reg_length
y_mean /= reg_length
xss = xy = 0.0
for j in range(reg_length):
k = i - j
x_diff = work2[k] - x_mean
y_diff = work1[k] - y_mean
xss += x_diff * x_diff
xy += x_diff * y_diff
coef = xy / (xss + 1.0e-60)
x_diff = work2[i] - x_mean
y_diff = work1[i] - y_mean
output[i] = y_diff - coef * x_diff
return output
[docs]
@njit
def macd(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
short_length: int,
long_length: int,
smoothing: float = 0.0,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Moving Average Convergence Divergence.
Args:
high: High prices.
low: Low prices.
close: Close prices.
short_length: Short-term lookback.
long_length: Long-term lookback.
smoothing: Compute MACD minus smoothed if >= 2.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert len(high) == len(low) and len(high) == len(close)
assert short_length > 0
assert short_length <= long_length
assert smoothing >= 0
assert scale > 0
n = len(close)
output = np.zeros(n)
long_alpha = 2.0 / (long_length + 1.0)
short_alpha = 2.0 / (short_length + 1.0)
long_sum = short_sum = close[0]
for i in range(1, n):
long_sum = long_alpha * close[i] + (1.0 - long_alpha) * long_sum
short_sum = short_alpha * close[i] + (1.0 - short_alpha) * short_sum
diff = 0.5 * (long_length - 1.0)
diff -= 0.5 * (short_length - 1.0)
denom = np.sqrt(np.fabs(diff))
k = long_length + smoothing
if k > i:
k = i
denom *= _atr(i, k, high, low, close, False)
output[i] = (short_sum - long_sum) / (denom + 1.0e-15)
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
if smoothing > 1:
alpha = 2.0 / (smoothing + 1.0)
smoothed = output[0]
for i in range(1, n):
smoothed = alpha * output[i] + (1.0 - alpha) * smoothed
output[i] -= smoothed
return output
[docs]
@njit
def stochastic(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
smoothing: int = 0,
) -> NDArray[np.float64]:
"""Computes Stochastic.
Args:
high: High prices.
low: Low prices.
close: Close prices.
lookback: Number of lookback bars.
smoothing: Number of times the raw stochastic is smoothed, either 0,
1, or 2 times. Defaults to ``0``.
Returns:
:class:`numpy.ndarray` of computed values.
"""
assert len(high) == len(low) and len(high) == len(close)
assert lookback > 0
assert smoothing == 0 or smoothing == 1 or smoothing == 2
n = len(close)
front_bad = lookback - 1
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
min_val = 1.0e60
max_val = -1.0e60
for j in range(lookback):
if high[i - j] > max_val:
max_val = high[i - j]
if low[i - j] < min_val:
min_val = low[i - j]
sto_0 = (close[i] - min_val) / (max_val - min_val + 1.0e-60)
if smoothing == 0:
output[i] = 100.0 * sto_0 - 50
else:
if i == front_bad:
sto_1 = sto_0
output[i] = 100.0 * sto_0 - 50
else:
sto_1 = 0.33333333 * sto_0 + 0.66666667 * sto_1
if smoothing == 1:
output[i] = 100.0 * sto_1 - 50
else:
if i == front_bad + 1:
sto_2 = sto_1
output[i] = 100.0 * sto_1 - 50
else:
sto_2 = 0.33333333 * sto_1 + 0.66666667 * sto_2
output[i] = 100.0 * sto_2 - 50
return output
[docs]
@njit
def stochastic_rsi(
values: NDArray[np.float64],
rsi_lookback: int,
sto_lookback: int,
smoothing: float = 0.0,
) -> NDArray[np.float64]:
"""Computes Stochastic Relative Strength Index (RSI).
Args:
values: :class:`numpy.ndarray` of input.
rsi_lookback: Lookback length for RSI calculation.
sto_lookback: Lookback length for Stochastic calculation.
smoothing: Amount of smoothing; <= 1 for none. Defaults to ``0``.
Returns:
:class:`numpy.ndarray` of computed values.
"""
assert rsi_lookback > 0
assert sto_lookback > 0
assert smoothing >= 0
n = len(values)
front_bad = rsi_lookback + sto_lookback - 1
if front_bad > n:
front_bad = n
output = np.zeros(n)
if rsi_lookback >= n:
return output
for i in range(front_bad):
output[i] = 0
up_sum = dn_sum = 1.0e-60
for i in range(1, rsi_lookback):
diff = values[i] - values[i - 1]
if diff > 0.0:
up_sum += diff
else:
dn_sum -= diff
up_sum /= rsi_lookback - 1
dn_sum /= rsi_lookback - 1
work1 = np.zeros(n)
for i in range(rsi_lookback, n):
diff = values[i] - values[i - 1]
if diff > 0.0:
up_sum = ((rsi_lookback - 1) * up_sum + diff) / rsi_lookback
dn_sum *= (rsi_lookback - 1.0) / rsi_lookback
else:
dn_sum = ((rsi_lookback - 1) * dn_sum - diff) / rsi_lookback
up_sum *= (rsi_lookback - 1.0) / rsi_lookback
work1[i] = 100.0 * up_sum / (up_sum + dn_sum)
for i in range(front_bad, n):
min_val = 1.0e60
max_val = -1.0e60
for j in range(sto_lookback):
if work1[i - j] > max_val:
max_val = work1[i - j]
if work1[i - j] < min_val:
min_val = work1[i - j]
output[i] = (
100.0 * (work1[i] - min_val) / (max_val - min_val + 1.0e-60) - 50.0
)
if smoothing > 1:
alpha = 2.0 / (smoothing + 1.0)
smoothed = output[front_bad]
for i in range(front_bad + 1, n):
smoothed = alpha * output[i] + (1.0 - alpha) * smoothed
output[i] = smoothed
return output
@njit
def _legendre_1(n: int) -> NDArray[np.float64]:
c1 = np.zeros(n)
total = 0.0
for i in range(n):
c1[i] = 2.0 * i / (n - 1.0) - 1.0
total += c1[i] * c1[i]
total = np.sqrt(total)
for i in range(n):
c1[i] /= total
return c1
@njit
def _legendre_2(n: int) -> tuple[NDArray, NDArray]:
c1 = _legendre_1(n)
c2 = np.zeros(n)
total = 0.0
for i in range(n):
c2[i] = c1[i] * c1[i]
total += c2[i]
mean = total / n
total = 0.0
for i in range(n):
c2[i] -= mean
total += c2[i] * c2[i]
total = np.sqrt(total)
for i in range(n):
c2[i] /= total
return c1, c2
@njit
def _legendre_3(n: int) -> tuple[NDArray, NDArray, NDArray]:
"""Computes the first three Legendre polynomials.
The first polynomial measures linear trend, the second measures the
quadratic trend, and the third measures the cubic trend.
Args:
n: Length of result.
Returns:
Tuple of first three Legendre polynomials.
"""
c1, c2 = _legendre_2(n)
c3 = np.zeros(n)
total = 0.0
for i in range(n):
c3[i] = c1[i] * c1[i] * c1[i]
total += c3[i]
mean = total / n
total = 0.0
for i in range(n):
c3[i] -= mean
total += c3[i] * c3[i]
total = np.sqrt(total)
for i in range(n):
c3[i] /= total
proj = 0.0
for i in range(n):
proj += c1[i] * c3[i]
total = 0.0
for i in range(n):
c3[i] -= proj * c1[i]
total += c3[i] * c3[i]
total = np.sqrt(total)
for i in range(n):
c3[i] /= total
return c1, c2, c3
@njit
def _trend(
values: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
atr_length: int,
scale: float,
trend_type: Literal["linear", "quadratic", "cubic"],
) -> NDArray[np.float64]:
assert (
len(values) == len(high)
and len(values) == len(low)
and len(values) == len(close)
)
assert lookback > 0
assert atr_length > 0
assert scale > 0
n = len(values)
front_bad = lookback - 1 if ((lookback - 1) > atr_length) else atr_length
if front_bad > n:
front_bad = n
output = np.zeros(n)
dptr = None
for i in range(front_bad, n):
if trend_type == "linear":
dptr = _legendre_1(lookback)
elif trend_type == "quadratic":
_, dptr = _legendre_2(lookback)
else:
_, _, dptr = _legendre_3(lookback)
dptr_i = 0
dot_prod = 0.0
mean = 0.0
for j in range(i - lookback + 1, i + 1):
price = np.log(values[j])
mean += price
dot_prod += price * dptr[dptr_i]
dptr_i += 1
mean /= lookback
dptr_i -= lookback
k = lookback - 1
if lookback == 2:
k = 2
denom = _atr(i, atr_length, high, low, close, True) * k
output[i] = dot_prod * 2.0 / (denom + 1.0e-60)
yss = rsq = 0.0
for j in range(i - lookback + 1, i + 1):
price = np.log(values[j])
diff = price - mean
yss += diff * diff
pred = dot_prod * dptr[dptr_i]
dptr_i += 1
diff = diff - pred
rsq += diff * diff
rsq = 1 - rsq / (yss + 1.0e-60)
if rsq < 0:
rsq = 0
output[i] *= rsq
output[i] = 100 * normal_cdf(scale * output[i]) - 50
return output
[docs]
def linear_trend(
values: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
atr_length: int,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Linear Trend Strength.
Args:
values: :class:`numpy.ndarray` of input.
high: High prices.
low: Low prices.
close: Close prices.
lookback: Number of lookback bars.
atr_length: Lookback length used for Average True Range (ATR)
normalization.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _trend(
values, high, low, close, lookback, atr_length, scale, "linear"
)
[docs]
def quadratic_trend(
values: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
atr_length: int,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Quadratic Trend Strength.
Args:
values: :class:`numpy.ndarray` of input.
high: High prices.
low: Low prices.
close: Close prices.
lookback: Number of lookback bars.
atr_length: Lookback length used for Average True Range (ATR)
normalization.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _trend(
values, high, low, close, lookback, atr_length, scale, "quadratic"
)
[docs]
def cubic_trend(
values: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
atr_length: int,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Cubic Trend Strength.
Args:
values: :class:`numpy.ndarray` of input.
high: High prices.
low: Low prices.
close: Close prices.
lookback: Number of lookback bars.
atr_length: Lookback length used for Average True Range (ATR)
normalization.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _trend(
values, high, low, close, lookback, atr_length, scale, "cubic"
)
[docs]
@njit
def adx(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
) -> NDArray[np.float64]:
"""Computes Average Directional Movement Index.
Args:
high: High prices.
low: Low prices.
close: Close prices.
lookback: Number of lookback bars.
Returns:
:class:`numpy.ndarray` of computed values.
"""
assert len(high) == len(low) and len(high) == len(close)
assert lookback > 0
n = len(close)
output = np.zeros(n)
if n <= 2 * lookback:
return output
output[0] = 0
dms_plus = dms_minus = atr_ = 0.0
for i in range(1, lookback + 1):
dm_plus = high[i] - high[i - 1]
dm_minus = low[i - 1] - low[i]
if dm_plus >= dm_minus:
dm_minus = 0.0
else:
dm_plus = 0.0
if dm_plus < 0.0:
dm_plus = 0.0
if dm_minus < 0.0:
dm_minus = 0.0
dms_plus += dm_plus
dms_minus += dm_minus
term = high[i] - low[i]
if high[i] - close[i - 1] > term:
term = high[i] - close[i - 1]
if close[i - 1] - low[i] > term:
term = close[i - 1] - low[i]
atr_ += term
di_plus = dms_plus / (atr_ + 1.0e-10)
di_minus = dms_minus / (atr_ + 1.0e-10)
adx_ = np.fabs(di_plus - di_minus) / (di_plus + di_minus + 1.0e-10)
output[i] = 100 * adx_
for i in range(lookback + 1, 2 * lookback):
dm_plus = high[i] - high[i - 1]
dm_minus = low[i - 1] - low[i]
if dm_plus >= dm_minus:
dm_minus = 0.0
else:
dm_plus = 0.0
if dm_plus < 0.0:
dm_plus = 0.0
if dm_minus < 0.0:
dm_minus = 0.0
dms_plus = (lookback - 1.0) / lookback * dms_plus + dm_plus
dms_minus = (lookback - 1.0) / lookback * dms_minus + dm_minus
term = high[i] - low[i]
if high[i] - close[i - 1] > term:
term = high[i] - close[i - 1]
if close[i - 1] - low[i] > term:
term = close[i - 1] - low[i]
atr_ = (lookback - 1.0) / lookback * atr_ + term
di_plus = dms_plus / (atr_ + 1.0e-10)
di_minus = dms_minus / (atr_ + 1.0e-10)
adx_ += np.fabs(di_plus - di_minus) / (di_plus + di_minus + 1.0e-10)
output[i] = 100 * adx_ / (i - lookback + 1)
adx_ /= lookback
for i in range(2 * lookback, n):
dm_plus = high[i] - high[i - 1]
dm_minus = low[i - 1] - low[i]
if dm_plus >= dm_minus:
dm_minus = 0.0
else:
dm_plus = 0.0
if dm_plus < 0.0:
dm_plus = 0.0
if dm_minus < 0.0:
dm_minus = 0.0
dms_plus = (lookback - 1.0) / lookback * dms_plus + dm_plus
dms_minus = (lookback - 1.0) / lookback * dms_minus + dm_minus
term = high[i] - low[i]
if high[i] - close[i - 1] > term:
term = high[i] - close[i - 1]
if close[i - 1] - low[i] > term:
term = close[i - 1] - low[i]
atr_ = (lookback - 1.0) / lookback * atr_ + term
di_plus = dms_plus / (atr_ + 1.0e-10)
di_minus = dms_minus / (atr_ + 1.0e-10)
term = np.fabs(di_plus - di_minus) / (di_plus + di_minus + 1.0e-10)
adx_ = (lookback - 1.0) / lookback * adx_ + term / lookback
output[i] = 100 * adx_
return output
@njit
def _aroon(
high: NDArray[np.float64],
low: NDArray[np.float64],
lookback: int,
aroon_type: Literal["up", "down", "diff"],
) -> NDArray[np.float64]:
assert len(high) == len(low)
assert lookback > 0
n = len(high)
output = np.zeros(n)
if aroon_type == "up" or aroon_type == "down":
output[0] = 50
elif aroon_type == "diff":
output[0] = 0
for i in range(1, n):
if aroon_type == "up" or aroon_type == "diff":
i_max = i
x_max = high[i]
for i in range(i - 1, i - lookback - 1, -1):
if i < 0:
break
if high[i] > x_max:
x_max = high[i]
i_max = i
if aroon_type == "down" or aroon_type == "diff":
i_min = i
x_min = low[i]
for i in range(i - 1, i - lookback - 1, -1):
if i < 0:
break
if low[i] < x_min:
x_min = low[i]
i_min = i
if aroon_type == "up":
output[i] = 100 * (lookback - (i - i_max)) / lookback
elif aroon_type == "down":
output[i] = 100 * (lookback - (i - i_min)) / lookback
else:
max_val = 100 * (lookback - (i - i_max)) / lookback
min_val = 100 * (lookback - (i - i_min)) / lookback
output[i] = max_val - min_val
return output
[docs]
@njit
def aroon_up(
high: NDArray[np.float64],
low: NDArray[np.float64],
lookback: int,
) -> NDArray[np.float64]:
"""Computes Aroon Upward Trend.
Args:
high: High prices.
low: Low prices.
lookback: Number of lookback bars.
Returns:
:class:`numpy.ndarray` of computed values.
"""
return _aroon(high, low, lookback, "up")
[docs]
@njit
def aroon_down(
high: NDArray[np.float64],
low: NDArray[np.float64],
lookback: int,
) -> NDArray[np.float64]:
"""Computes Aroon Downward Trend.
Args:
high: High prices.
low: Low prices.
lookback: Number of lookback bars.
Returns:
:class:`numpy.ndarray` of computed values.
"""
return _aroon(high, low, lookback, "down")
[docs]
@njit
def aroon_diff(
high: NDArray[np.float64],
low: NDArray[np.float64],
lookback: int,
) -> NDArray[np.float64]:
"""Computes Aroon Upward Trend minus Aroon Downward Trend.
Args:
high: High prices.
low: Low prices.
lookback: Number of lookback bars.
Returns:
:class:`numpy.ndarray` of computed values.
"""
return _aroon(high, low, lookback, "diff")
[docs]
@njit
def close_minus_ma(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
lookback: int,
atr_length: int,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Close Minus Moving Average.
Args:
close: Close prices.
high: High prices.
low: Low prices.
lookback: Number of lookback bars.
atr_length: Lookback length used for Average True Range (ATR)
normalization.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert len(high) == len(low) and len(high) == len(close)
assert lookback > 0
assert atr_length > 0
assert scale > 0
n = len(close)
front_bad = max(lookback, atr_length)
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
total = 0.0
for j in range(i - lookback, i):
total += np.log(close[j])
total /= lookback
denom = _atr(i, atr_length, high, low, close, True)
if denom > 0.0:
denom *= np.sqrt(lookback + 1.0)
output[i] = (np.log(close[i]) - total) / denom
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
else:
output[i] = 0.0
return output
@njit
def _deviation(
values: NDArray[np.float64],
lookback: int,
scale: float,
dev_type: Literal["linear", "quadratic", "cubic"],
) -> NDArray[np.float64]:
assert lookback > 0
assert scale > 0
n = len(values)
if dev_type == "linear" and lookback < 3:
lookback = 3
if dev_type == "quadratic" and lookback < 4:
lookback = 4
if dev_type == "cubic" and lookback < 5:
lookback = 5
front_bad = lookback - 1
if front_bad > n:
front_bad = n
if dev_type == "quadratic" or dev_type == "cubic":
work1, work2, work3 = _legendre_3(lookback)
else:
work1 = _legendre_1(lookback)
output = np.zeros(n)
for i in range(front_bad, n):
c0 = c1 = c2 = c3 = 0.0
dptr = work1
dptr_i = 0
for j in range(i - lookback + 1, i + 1):
price = np.log(values[j])
c0 += price
c1 += price * dptr[dptr_i]
dptr_i += 1
c0 /= lookback
if dev_type == "quadratic" or dev_type == "cubic":
dptr = work2
dptr_i = 0
for j in range(i - lookback + 1, i + 1):
price = np.log(values[j])
c2 += price * dptr[dptr_i]
dptr_i += 1
if dev_type == "cubic":
dptr = work3
dptr_i = 0
for j in range(i - lookback + 1, i + 1):
price = np.log(values[j])
c3 += price * dptr[dptr_i]
dptr_i += 1
j = 0
total = 0.0
for k in range(i - lookback + 1, i + 1):
pred = c0 + c1 * work1[j]
if dev_type == "quadratic" or dev_type == "cubic":
pred += c2 * work2[j]
if dev_type == "cubic":
pred += c3 * work3[j]
diff = np.log(values[k]) - pred
total += diff * diff
j += 1
denom = np.sqrt(total / lookback)
if denom > 0.0:
pred = c0 + c1 * work1[lookback - 1]
if dev_type == "quadratic" or dev_type == "cubic":
pred += c2 * work2[lookback - 1]
if dev_type == "cubic":
pred += c3 * work3[lookback - 1]
output[i] = (np.log(values[i]) - pred) / denom
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
else:
output[i] = 0.0
return output
[docs]
@njit
def linear_deviation(
values: NDArray[np.float64],
lookback: int,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Deviation from Linear Trend.
Args:
values: :class:`numpy.ndarray` of input.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _deviation(values, lookback, scale, "linear")
[docs]
@njit
def quadratic_deviation(
values: NDArray[np.float64],
lookback: int,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Deviation from Quadratic Trend.
Args:
values: :class:`numpy.ndarray` of input.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _deviation(values, lookback, scale, "quadratic")
[docs]
@njit
def cubic_deviation(
values: NDArray[np.float64],
lookback: int,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Deviation from Cubic Trend.
Args:
values: :class:`numpy.ndarray` of input.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _deviation(values, lookback, scale, "cubic")
[docs]
@njit
def price_intensity(
open: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
smoothing: float = 0.0,
scale: float = 0.8,
) -> NDArray[np.float64]:
"""Computes Price Intensity.
Args:
open: Open prices.
high: High prices.
low: Low prices.
close: Close prices.
smoothing: Amount of smoothing. Defaults to ``0``.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.8``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert (
len(open) == len(high)
and len(open) == len(low)
and len(open) == len(close)
)
assert smoothing >= 0
assert scale > 0
n = len(close)
if smoothing < 1:
smoothing = 1
output = np.zeros(n)
denom = high[0] - low[0]
if denom < 1.0e-60:
denom = 1.0e-60
output[0] = (close[0] - open[0]) / denom
for i in range(1, n):
denom = high[i] - low[i]
if high[i] - close[i - 1] > denom:
denom = high[i] - close[i - 1]
if close[i - 1] - low[i] > denom:
denom = close[i - 1] - low[i]
if denom < 1.0e-60:
denom = 1.0e-60
output[i] = (close[i] - open[i]) / denom
if smoothing > 1:
alpha = 2.0 / (smoothing + 1.0)
smoothed = output[0]
for i in range(1, n):
smoothed = alpha * output[i] + (1.0 - alpha) * smoothed
output[i] = smoothed
for i in range(n):
output[i] = (
100.0 * normal_cdf(scale * np.sqrt(smoothing) * output[i]) - 50.0
)
return output
[docs]
@njit
def price_change_oscillator(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
short_length: int,
multiplier: int,
scale: float = 4.0,
) -> NDArray[np.float64]:
"""Computes Price Change Oscillator.
Args:
high: High prices.
low: Low prices.
close: Close prices.
short_length: Number of short lookback bars.
multiplier: Multiplier used to compute number of long lookback bars =
``multiplier * short_length``.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``4.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert len(high) == len(low) and len(high) == len(close)
assert short_length > 0
assert multiplier > 0
assert scale > 0
n = len(close)
if multiplier < 2:
multiplier = 2
long_length = short_length * multiplier
front_bad = long_length
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
short_sum = 0.0
for j in range(i - short_length - 1, i + 1):
short_sum += np.fabs(np.log(close[j] / close[j - 1]))
long_sum = short_sum
for j in range(i - long_length + 1, i - short_length + 1):
long_sum += np.fabs(np.log(close[j] / close[j - 1]))
short_sum /= short_length
long_sum /= long_length
denom = 0.36 + 1.0 / short_length
v = np.log(0.5 * multiplier) / 1.609
denom += 0.7 * v
denom *= _atr(i, long_length, high, low, close, True)
if denom > 1.0e-20:
output[i] = (short_sum - long_sum) / denom
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
else:
output[i] = 0.0
return output
@njit
def _flow(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
smoothing: float,
flow_type: Literal["intraday", "money_flow"],
) -> NDArray[np.float64]:
assert (
len(high) == len(low)
and len(high) == len(close)
and len(high) == len(volume)
)
assert lookback > 0
assert smoothing >= 0
n = len(close)
front_bad = lookback - 1
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(first_volume, n):
if high[i] > low[i]:
output[i] = (
100.0
* (2.0 * close[i] - high[i] - low[i])
/ (high[i] - low[i])
* volume[i]
)
else:
output[i] = 0.0
if lookback > 1:
for i in range(n - 1, front_bad - 1, -1):
total = 0.0
for j in range(lookback):
total += output[i - j]
output[i] = total / lookback
if flow_type == "money_flow":
for i in range(front_bad, n):
total = 0.0
for j in range(lookback):
total += volume[i - j]
total /= lookback
if total > 0.0:
output[i] /= total
else:
output[i] = 0.0
elif smoothing > 1:
alpha = 2.0 / (smoothing + 1.0)
smoothed = volume[first_volume]
for i in range(first_volume, n):
smoothed = alpha * volume[i] + (1.0 - alpha) * smoothed
if smoothed > 0.0:
output[i] /= smoothed
else:
output[i] = 0.0
for i in range(front_bad):
output[i] = 0.0
return output
[docs]
@njit
def intraday_intensity(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
smoothing: float = 0.0,
) -> NDArray[np.float64]:
"""Computes Intraday Intensity.
Args:
high: High prices.
low: Low prices.
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
smoothing: Amount of smoothing; <= 1 for none. Defaults to ``0``.
Returns:
:class:`numpy.ndarray` of computed values.
"""
return _flow(high, low, close, volume, lookback, smoothing, "intraday")
[docs]
@njit
def money_flow(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
smoothing: float = 0.0,
) -> NDArray[np.float64]:
"""Computes Chaikin's Money Flow.
Args:
high: High prices.
low: Low prices.
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
smoothing: Amount of smoothing; <= 1 for none. Defaults to ``0``.
Returns:
:class:`numpy.ndarray` of computed values.
"""
return _flow(high, low, close, volume, lookback, smoothing, "money_flow")
[docs]
@njit
def reactivity(
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
smoothing: float = 0.0,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Reactivity.
Args:
high: High prices.
low: Low prices.
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
smoothing: Smoothing multiplier.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert (
len(high) == len(low)
and len(high) == len(close)
and len(high) == len(volume)
)
assert lookback > 0
assert smoothing >= 0
assert scale > 0
n = len(close)
front_bad = lookback
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad):
output[i] = 0.0
alpha = 2.0 / (lookback * smoothing + 1)
lowest = low[first_volume]
highest = high[first_volume]
smoothed_range = highest - lowest
smoothed_volume = volume[first_volume]
if smoothed_range == 0:
return output
if first_volume + 1 >= n or first_volume + lookback >= n:
return output
for i in range(first_volume + 1, first_volume + lookback):
if high[i] > highest:
highest = high[i]
if low[i] < lowest:
lowest = low[i]
smoothed_range = (
alpha * (highest - lowest) + (1.0 - alpha) * smoothed_range
)
smoothed_volume = alpha * volume[i] + (1.0 - alpha) * smoothed_volume
for i in range(front_bad, n):
lowest = low[i]
highest = high[i]
for j in range(1, lookback + 1):
if high[i - j] > highest:
highest = high[i - j]
if low[i - j] < lowest:
lowest = low[i - j]
smoothed_range = (
alpha * (highest - lowest) + (1.0 - alpha) * smoothed_range
)
smoothed_volume = alpha * volume[i] + (1.0 - alpha) * smoothed_volume
aspect_ratio = (highest - lowest) / smoothed_range
if volume[i] > 0.0 and smoothed_volume > 0.0:
aspect_ratio /= volume[i] / smoothed_volume
else:
aspect_ratio = 1.0
output[i] = aspect_ratio * (close[i] - close[i - lookback])
output[i] /= smoothed_range
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
return output
[docs]
@njit
def price_volume_fit(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float = 9.0,
) -> NDArray[np.float64]:
"""Computes Price Volume Fit.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``9.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert len(close) == len(volume)
assert lookback > 0
assert scale > 0
n = len(close)
front_bad = lookback - 1
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
x_mean = y_mean = 0.0
for j in range(lookback):
k = i - j
x_mean += np.log(volume[k] + 1.0)
y_mean += np.log(close[k])
x_mean /= lookback
y_mean /= lookback
xss = xy = 0.0
for j in range(lookback):
k = i - j
x_diff = np.log(volume[k] + 1.0) - x_mean
y_diff = np.log(close[k]) - y_mean
xss += x_diff * x_diff
xy += x_diff * y_diff
coef = xy / (xss + 1.0e-30)
output[i] = 100.0 * normal_cdf(scale * coef) - 50.0
return output
[docs]
@njit
def volume_weighted_ma_ratio(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float = 1.0,
) -> NDArray[np.float64]:
"""Computes Volume-Weighted Moving Average Ratio.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``1.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert len(close) == len(volume)
assert lookback > 0
assert scale > 0
n = len(close)
front_bad = lookback - 1
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
total = numer = denom = 0.0
for j in range(i - lookback + 1, i + 1):
numer += volume[j] * close[j]
denom += close[j]
total += volume[j]
if total > 0.0:
output[i] = (
1000.0
* np.log(lookback * numer / (total * denom))
/ np.sqrt(lookback)
)
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
else:
output[i] = 0.0
return output
@njit
def _on_balance_volume(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
delta_length: int,
scale: float,
volume_type: Literal["normalized", "delta"],
) -> NDArray[np.float64]:
assert len(close) == len(volume)
assert lookback > 0
assert delta_length >= 0
assert scale > 0
n = len(close)
front_bad = lookback
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
signed_volume = total_volume = 0.0
for j in range(lookback):
if close[i - j] > close[i - j - 1]:
signed_volume += volume[i - j]
elif close[i - j] < close[i - j - 1]:
signed_volume -= volume[i - j]
total_volume += volume[i - j]
if total_volume <= 0.0:
output[i] = 0.0
continue
value = signed_volume / total_volume
value *= np.sqrt(lookback)
output[i] = 100.0 * normal_cdf(scale * value) - 50.0
if volume_type == "delta":
if delta_length < 1:
delta_length = 1
front_bad += delta_length
if front_bad > n:
front_bad = n
for i in range(n - 1, front_bad - 1, -1):
output[i] -= output[i - delta_length]
return output
[docs]
@njit
def normalized_on_balance_volume(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Normalized On-Balance Volume.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _on_balance_volume(close, volume, lookback, 0, scale, "normalized")
[docs]
@njit
def delta_on_balance_volume(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
delta_length: int = 0,
scale: float = 0.6,
) -> NDArray[np.float64]:
"""Computes Delta On-Balance Volume.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
delta_length: Lag for differencing.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.6``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _on_balance_volume(
close, volume, lookback, delta_length, scale, "delta"
)
@njit
def _normalized_volume_index(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float,
volume_type: Literal["positive", "negative"],
) -> NDArray[np.float64]:
assert len(close) == len(volume)
assert lookback > 0
assert scale > 0
n = len(close)
volatility_length = 2 * lookback
if volatility_length < 250:
volatility_length = 250
front_bad = volatility_length
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
for i in range(front_bad, n):
total = 0.0
if volume_type == "positive":
for j in range(lookback):
if volume[i - j] > volume[i - j - 1]:
total += np.log(close[i - j] / close[i - j - 1])
else:
for j in range(lookback):
if volume[i - j] < volume[i - j - 1]:
total += np.log(close[i - j] / close[i - j - 1])
total /= np.sqrt(lookback)
denom = np.sqrt(_variance(True, i, volatility_length, close))
if denom > 0.0:
total /= denom
output[i] = 100.0 * normal_cdf(scale * total) - 50.0
else:
output[i] = 0.0
return output
[docs]
@njit
def normalized_positive_volume_index(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float = 0.5,
) -> NDArray[np.float64]:
"""Computes Normalized Positive Volume Index.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.5``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _normalized_volume_index(close, volume, lookback, scale, "positive")
[docs]
@njit
def normalized_negative_volume_index(
close: NDArray[np.float64],
volume: NDArray[np.float64],
lookback: int,
scale: float = 0.5,
) -> NDArray[np.float64]:
"""Computes Normalized Negative Volume Index.
Args:
close: Close prices.
volume: Trading volume.
lookback: Number of lookback bars.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``0.5``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
return _normalized_volume_index(close, volume, lookback, scale, "negative")
[docs]
@njit
def volume_momentum(
volume: NDArray[np.float64],
short_length: int,
multiplier: int = 2,
scale: float = 3.0,
) -> NDArray[np.float64]:
"""Computes Volume Momentum.
Args:
volume: Trading volume.
short_length: Number of short lookback bars.
multiplier: Lookback multiplier. Defaults to ``2``.
scale: Increase > 1.0 for more compression of return values,
decrease < 1.0 for less. Defaults to ``3.0``.
Returns:
:class:`numpy.ndarray` of computed values ranging [-50, 50].
"""
assert short_length > 0
assert multiplier >= 1
assert scale > 0
n = len(volume)
if multiplier < 2:
multiplier = 2
long_length = short_length * multiplier
front_bad = long_length - 1
for first_volume in range(n):
if volume[first_volume] > 0:
break
front_bad += first_volume
if front_bad > n:
front_bad = n
output = np.zeros(n)
denom = np.exp(np.log(multiplier) / 3.0)
for i in range(front_bad, n):
short_sum = 0.0
for j in range(i - short_length + 1, i + 1):
short_sum += volume[j]
long_sum = short_sum
for j in range(i - long_length + 1, i - short_length + 1):
long_sum += volume[j]
short_sum /= short_length
long_sum /= long_length
if long_sum > 0.0 and short_sum > 0.0:
output[i] = np.log(short_sum / long_sum) / denom
output[i] = 100.0 * normal_cdf(scale * output[i]) - 50.0
else:
output[i] = 0.0
return output
[docs]
@njit
def laguerre_rsi(
open: NDArray[np.float64],
high: NDArray[np.float64],
low: NDArray[np.float64],
close: NDArray[np.float64],
fe_length: int = 13,
) -> NDArray[np.float64]:
"""Computes Laguerre Relative Strength Index (RSI).
Args:
open: Open prices.
high: High prices.
low: Low prices.
close: Close prices.
fe_length: Fractal Energy length. Defaults to ``13``.
Returns:
:class:`numpy.ndarray` of computed values.
"""
assert (
len(open) == len(high)
and len(open) == len(low)
and len(open) == len(close)
)
assert fe_length > 0
n = len(close)
output = np.zeros(n)
if n <= fe_length:
return output
alpha = np.zeros(n)
L0_1, L1_1, L2_1, L3_1 = 0.0, 0.0, 0.0, 0.0
for i in range(fe_length, n):
OC = (open[i] + close[i - 1]) / 2.0
HC = max(high[i], close[i - 1])
LC = min(low[i], close[i - 1])
fe_src = (OC + HC + LC + close[i]) / 4.0
highest = max(high[i + 1 - fe_length : i + 1])
lowest = min(low[i + 1 - fe_length : i + 1])
denom = highest - lowest
if denom == 0:
output[i] = alpha[i] = 0
continue
s = 0
for i in range(fe_length):
diff = max(high[i - i], close[i - i - 1]) - min(
low[i - i], close[i - i - 1]
)
s += diff / denom
fe_alpha = np.log(s) / np.log(fe_length)
alpha[i] = fe_alpha * 100
L0 = fe_alpha * fe_src + (1 - fe_alpha) * L0_1
L1 = -(1 - fe_alpha) * L0 + L0_1 + (1 - fe_alpha) * L1_1
L2 = -(1 - fe_alpha) * L1 + L1_1 + (1 - fe_alpha) * L2_1
L3 = -(1 - fe_alpha) * L2 + L2_1 + (1 - fe_alpha) * L3_1
CU = (
(L0 - L1 if L0 >= L1 else 0)
+ (L1 - L2 if L1 >= L2 else 0)
+ (L2 - L3 if L2 >= L3 else 0)
)
CD = (
(0 if L0 >= L1 else L1 - L0)
+ (0 if L1 >= L2 else L2 - L1)
+ (0 if L2 >= L3 else L3 - L2)
)
lrsi = CU / (CU + CD) if CU + CD != 0 else 0
output[i] = lrsi * 100
L0_1, L1_1, L2_1, L3_1 = L0, L1, L2, L3
return output