Source code for pybroker.log

"""Logging module."""

"""Copyright (C) 2023 Edward West. All rights reserved.

This code is licensed under Apache 2.0 with Commons Clause license
(see LICENSE for details).
"""

import datetime
import logging
import numpy as np
import time
from pybroker.common import Day, IndicatorSymbol, ModelSymbol, to_datetime
from decimal import Decimal
from progressbar import ProgressBar
from typing import Iterable, Optional, Sequence, Sized


[docs] class Logger: """Class for logging information about triggered events. Args: scope: :class:`pybroker.scope.StaticScope`. """ def __init__(self, scope): self._scope = scope self._progress_bar: Optional[ProgressBar] = None self._download_start_time: Optional[float] = None self._train_split_start_time: Optional[float] = None self._train_model_start_time: Optional[float] = None self._walkforward_start_time: Optional[float] = None self._bootstrap_start_time: Optional[float] = None self._progress_bar_disabled = False self._disabled = False def _start_progress_bar(self, message: str, total_count: int): if self._disabled: return if self._progress_bar_disabled: print(message, flush=True) return self._progress_bar = ProgressBar(max_value=total_count) self._out(message) self._progress_bar.update(0) def _update_progress_bar(self, count: int): if ( self._progress_bar is None or self._disabled or self._progress_bar_disabled ): return self._progress_bar.update(count) if count == self._progress_bar.max_value: self._progress_bar.finish() self._progress_bar = None self._out("")
[docs] def disable(self): """Disables logging.""" self._disabled = True
[docs] def enable(self): """Enables logging.""" self._disabled = False
[docs] def disable_progress_bar(self): """Disables logging a progress bar.""" self._progress_bar_disabled = True
[docs] def enable_progress_bar(self): """Enables logging a progress bar.""" self._progress_bar_disabled = False
[docs] def download_bar_data_start(self): self._out("Loading bar data...") self._download_start_time = time.time()
[docs] def info_download_bar_data_start( self, symbols: Iterable[str], start_date: datetime.datetime, end_date: datetime.datetime, timeframe: str, ): self._info( "Loading:\n" f"{start_date} to {end_date}\n" f"timeframe: {timeframe}\n" f"{sorted(symbols)}" )
[docs] def loaded_bar_data(self): self._out("Loaded cached bar data.\n")
[docs] def info_loaded_bar_data( self, symbols: Iterable[str], start_date: datetime.datetime, end_date: datetime.datetime, timeframe: str, ): self._info( "Loaded:\n" f"namespace={self._scope.data_source_cache_ns}\n" f"{start_date} to {end_date}\n", f"timeframe: {timeframe}\n", f"{sorted(symbols)}", )
[docs] def info_invalidate_data_source_cache(self): self._info( "Mismatched columns in data source cache:\n" f"namespace={self._scope.data_source_cache_ns}\n" "Invalidating cache..." )
[docs] def debug_get_data_source_cache(self, cache_key): self._debug(f"Fetched data source cache:\n{cache_key}")
[docs] def debug_set_data_source_cache(self, cache_key): self._debug(f"Set data source cache:\n{cache_key}")
[docs] def download_bar_data_completed(self): if self._download_start_time is None: return self._out( "Loaded bar data:", self._format_time(self._download_start_time), "\n", ) self._download_start_time = None
[docs] def indicator_data_start(self, ind_syms: Sized): self._start_progress_bar("Computing indicators...", len(ind_syms))
[docs] def info_indicator_data_start(self, ind_syms: Iterable[IndicatorSymbol]): self._info(f"Indicators: {sorted(ind_syms)}")
[docs] def loaded_indicator_data(self): self._out("Loaded cached indicator data.\n")
[docs] def info_loaded_indicator_data(self, ind_syms: Iterable[IndicatorSymbol]): self._info( f"Loaded:\n" f"namespace={self._scope.indicator_cache_ns}\n" f"{sorted(ind_syms)}" )
[docs] def indicator_data_loading(self, count: int): self._update_progress_bar(count)
[docs] def debug_get_indicator_cache(self, cache_key): self._debug(f"Fetched indicator cache:\n{cache_key}")
[docs] def debug_set_indicator_cache(self, cache_key): self._debug(f"Set indicator cache:\n{cache_key}")
[docs] def debug_compute_indicators(self, is_parallel: bool): self._debug( "Computing indicators in parallel." if is_parallel else "Computing indicators in serial." )
[docs] def train_split_start(self, train_dates: Sequence[np.datetime64]): start_date = to_datetime(train_dates[0]) end_date = to_datetime(train_dates[-1]) self._out(f"Train split: {start_date} to {end_date}") self._train_split_start_time = time.time()
[docs] def info_train_split_start(self, model_syms: Iterable[ModelSymbol]): self._info(f"Models: {sorted(model_syms)}")
[docs] def loaded_models(self): self._out("Loaded cached models.\n")
[docs] def info_loaded_models(self, model_syms: Iterable[ModelSymbol]): self._info( f"Loaded:\n" f"namespace={self._scope.model_cache_ns}\n" f"{sorted(model_syms)}" )
[docs] def info_train_model_start(self, model_sym: ModelSymbol): self._info(f"Training model: {model_sym}") self._train_model_start_time = time.time()
[docs] def info_train_model_completed(self, model_sym: ModelSymbol): if self._train_model_start_time is None: return self._info( f"Finished training model {model_sym}:", self._format_time(self._train_model_start_time), ) self._train_model_start_time = None
[docs] def info_loaded_model(self, model_sym: ModelSymbol): self._info(f"Loaded model: {model_sym}")
[docs] def debug_get_model_cache(self, cache_key): self._debug(f"Fetched model cache:\n{cache_key}")
[docs] def debug_set_model_cache(self, cache_key): self._debug(f"Set model cache:\n{cache_key}")
[docs] def train_split_completed(self): if self._train_split_start_time is None: return self._out( "Finished training models:", self._format_time(self._train_split_start_time), "\n", ) self._train_split_start_time = None
[docs] def backtest_executions_start(self, test_dates: Sequence[np.datetime64]): if not len(test_dates): return start_date = to_datetime(test_dates[0]) end_date = to_datetime(test_dates[-1]) self._start_progress_bar( f"Test split: {start_date} to {end_date}", len(test_dates) )
[docs] def backtest_executions_loading(self, count: int): self._update_progress_bar(count)
[docs] def walkforward_start( self, start_date: datetime.datetime, end_date: datetime.datetime ): self._out(f"Backtesting: {start_date} to {end_date}\n") self._walkforward_start_time = time.time()
[docs] def info_walkforward_between_time(self, between_time: tuple[str, str]): self._info(f"Backtest between times: {between_time}")
[docs] def info_walkforward_on_days(self, days: tuple[int]): self._info(f"Backtest on days: {map(lambda d: Day(d).name, days)}")
[docs] def walkforward_completed(self): if self._walkforward_start_time is None: return self._out( "Finished backtest:", self._format_time(self._walkforward_start_time), ) self._walkforward_start_time = None
[docs] def calc_bootstrap_metrics_start(self, samples, sample_size): self._out( f"Calculating bootstrap metrics: sample_size={sample_size}, " f"samples={samples}..." ) self._bootstrap_start_time = time.time()
[docs] def calc_bootstrap_metrics_completed(self): if self._bootstrap_start_time is None: return self._out( "Calculated bootstrap metrics:", self._format_time(self._bootstrap_start_time), "\n", ) self._bootstrap_start_time = None
[docs] def debug_place_buy_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Placing buy order:\n{order}")
[docs] def debug_buy_shares_exceed_cash( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], cash: Decimal, clamped_shares: Decimal, ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug( f"Buy order amount exceeds available cash={cash}:\n{order}\n" f"Setting buy_shares={clamped_shares}." )
[docs] def debug_filled_buy_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Filled buy order:\n{order}")
[docs] def debug_unfilled_buy_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Unfilled buy order:\n{order}")
[docs] def debug_place_sell_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Placing sell order:\n{order}")
[docs] def debug_filled_sell_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Filled sell order:\n{order}")
[docs] def debug_unfilled_sell_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): order = self._format_order( date=date, symbol=symbol, shares=shares, fill_price=fill_price, limit_price=limit_price, ) self._debug(f"Unfilled sell order:\n{order}")
[docs] def debug_schedule_order(self, date: np.datetime64, exec_result): self._debug(f"Scheduling order: {date}\n{exec_result}")
[docs] def debug_unscheduled_order(self, exec_result): self._debug(f"Unscheduled order:\n{exec_result}")
[docs] def warn_bootstrap_sample_size(self, n: int, sample_size: int): self._warn( f"Returns length {n} < sample size {sample_size}.\n" "Setting number of bootstraps to 1." )
[docs] def debug_enable_data_source_cache(self, ns: str, cache_dir: str): self._debug( "Enabled data source cache:\n" f"namespace={ns}\n" f"dir={cache_dir}" )
[docs] def debug_disable_data_source_cache(self): self._debug("Disabled data source cache.")
[docs] def debug_clear_data_source_cache(self, cache_dir: str): self._debug(f"Cleared data source cache: {cache_dir}")
[docs] def debug_enable_indicator_cache(self, ns: str, cache_dir: str): self._debug( "Enabled indicator cache:\n" f"namespace={ns}\n" f"dir={cache_dir}" )
[docs] def debug_disable_indicator_cache(self): self._debug("Disabled indicator cache.")
[docs] def debug_clear_indicator_cache(self, cache_dir: str): self._debug(f"Cleared indicator cache: {cache_dir}")
[docs] def debug_enable_model_cache(self, ns: str, cache_dir: str): self._debug( "Enabled model cache:\n" f"namespace={ns}\n" f"dir={cache_dir}" )
[docs] def debug_disable_model_cache(self): self._debug("Disabled model cache.")
[docs] def debug_clear_model_cache(self, cache_dir: str): self._debug(f"Cleared model cache: {cache_dir}")
def _out(self, msg: str, *args): if self._disabled: return print(msg, *args, flush=True) def _info(self, msg: str, *args): if self._disabled: return logging.info(msg, *args) def _debug(self, msg: str, *args): if self._disabled: return logging.debug(msg, *args) def _warn(self, msg: str, *args): if self._disabled: return logging.warning(msg, *args) def _format_order( self, date: np.datetime64, symbol: str, shares: Decimal, fill_price: Decimal, limit_price: Optional[Decimal], ): return ( f"date={to_datetime(date)}\n" f"symbol={symbol}\n" f"shares={shares}\n" f"fill_price={fill_price}\n" f"limit_price={limit_price}\n" ) def _format_time(self, start_seconds: float) -> str: delta = time.time() - start_seconds return str(datetime.timedelta(seconds=round(delta)))