The notebook linear_regression.ipynb contains examples for the prediction of stock prices using OLS with statsmodels and sklearn, as well as ridge and lasso models.
It is designed to run as a notebook on the Quantopian research platform.
This notebook is written for the Quantopian research environment.
import pandas as pd
import numpy as np
from time import time
import talib
import re
from statsmodels.api import OLS
from sklearn.metrics import mean_squared_error
from scipy.stats import spearmanr, pearsonr
from sklearn.linear_model import LinearRegression, Ridge, RidgeCV, Lasso, LassoCV, LogisticRegression
from sklearn.preprocessing import StandardScaler
from quantopian.research import run_pipeline
from quantopian.pipeline import Pipeline, factors, filters, classifiers
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import (Latest,
Returns,
AverageDollarVolume,
SimpleMovingAverage,
EWMA,
BollingerBands,
CustomFactor,
MarketCap,
SimpleBeta)
from quantopian.pipeline.filters import QTradableStocksUS, StaticAssets
from quantopian.pipeline.data.quandl import fred_usdontd156n as libor
from empyrical import max_drawdown, sortino_ratio
import seaborn as sns
import matplotlib.pyplot as plt
################
# Fundamentals #
################
# Morningstar fundamentals (2002 - Ongoing)
# https://www.quantopian.com/help/fundamentals
from quantopian.pipeline.data import Fundamentals
#####################
# Analyst Estimates #
#####################
# Earnings Surprises - Zacks (27 May 2006 - Ongoing)
# https://www.quantopian.com/data/zacks/earnings_surprises
from quantopian.pipeline.data.zacks import EarningsSurprises
from quantopian.pipeline.factors.zacks import BusinessDaysSinceEarningsSurprisesAnnouncement
##########
# Events #
##########
# Buyback Announcements - EventVestor (01 Jun 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/buyback_auth
from quantopian.pipeline.data.eventvestor import BuybackAuthorizations
from quantopian.pipeline.factors.eventvestor import BusinessDaysSinceBuybackAuth
# CEO Changes - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/ceo_change
from quantopian.pipeline.data.eventvestor import CEOChangeAnnouncements
# Dividends - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/dividends
from quantopian.pipeline.data.eventvestor import (
DividendsByExDate,
DividendsByPayDate,
DividendsByAnnouncementDate,
)
from quantopian.pipeline.factors.eventvestor import (
BusinessDaysSincePreviousExDate,
BusinessDaysUntilNextExDate,
BusinessDaysSinceDividendAnnouncement,
)
# Earnings Calendar - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/earnings_calendar
from quantopian.pipeline.data.eventvestor import EarningsCalendar
from quantopian.pipeline.factors.eventvestor import (
BusinessDaysUntilNextEarnings,
BusinessDaysSincePreviousEarnings
)
# 13D Filings - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/_13d_filings
from quantopian.pipeline.data.eventvestor import _13DFilings
from quantopian.pipeline.factors.eventvestor import BusinessDaysSince13DFilingsDate
#############
# Sentiment #
#############
# News Sentiment - Sentdex Sentiment Analysis (15 Oct 2012 - Ongoing)
# https://www.quantopian.com/data/sentdex/sentiment
from quantopian.pipeline.data.sentdex import sentiment
We need to select a universe of equities and a time horizon, build and transform alpha factors that we will use as features, calculate forward returns that we aim to predict, and potentially clean our data.
# trading days per period
MONTH = 21
YEAR = 12 * MONTH
START = '2017-01-01'
END = '2018-12-31'
We will use equity data for the years 2014 and 2015 from a custom Q50US universe that uses built-in filters, factors, and classifiers to select the 50 stocks with the highest average dollar volume of the last 200 trading days filtered by additional default criteria (see Quantopian docs linked on GitHub for detail). The universe dynamically updates based on the filter criteria so that, while there are 100 stocks at any given point, there may be more than 50 distinct equities in the sample:
def Q50US():
return filters.make_us_equity_universe(
target_size=50,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=classifiers.fundamentals.Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
# UNIVERSE = StaticAssets(symbols(['MSFT', 'AAPL']))
UNIVERSE = Q50US()
class AnnualizedData(CustomFactor):
# Get the sum of the last 4 reported values
window_length = 260
def compute(self, today, assets, out, asof_date, values):
for asset in range(len(assets)):
# unique asof dates indicate availability of new figures
_, filing_dates = np.unique(asof_date[:, asset], return_index=True)
quarterly_values = values[filing_dates[-4:], asset]
# ignore annual windows with <4 quarterly data points
if len(~np.isnan(quarterly_values)) != 4:
out[asset] = np.nan
else:
out[asset] = np.sum(quarterly_values)
class AnnualAvg(CustomFactor):
window_length = 252
def compute(self, today, assets, out, values):
out[:] = (values[0] + values[-1])/2
def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
chunks = []
current = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
step = pd.Timedelta(weeks=26) if chunks_len is None else chunks_len
start_pipeline_timer = time()
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
start_timer = time()
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)
end_timer = time()
print "Time to run this chunk of the pipeline %.2f secs" % (end_timer - start_timer)
end_pipeline_timer = time()
print "Time to run the entire pipeline %.2f secs" % (end_pipeline_timer - start_pipeline_timer)
return pd.concat(chunks)
def factor_pipeline(factors):
start = time()
pipe = Pipeline({k: v(mask=UNIVERSE).rank() for k, v in factors.items()},
screen=UNIVERSE)
result = run_pipeline_chunks(pipe, start_date=START, end_date=END)
return result, time() - start
class ValueFactors:
"""Definitions of factors for cross-sectional trading algorithms"""
@staticmethod
def PriceToSalesTTM(**kwargs):
"""Last closing price divided by sales per share"""
return Fundamentals.ps_ratio.latest
@staticmethod
def PriceToEarningsTTM(**kwargs):
"""Closing price divided by earnings per share (EPS)"""
return Fundamentals.pe_ratio.latest
@staticmethod
def PriceToDilutedEarningsTTM(mask):
"""Closing price divided by diluted EPS"""
last_close = USEquityPricing.close.latest
diluted_eps = AnnualizedData(inputs = [Fundamentals.diluted_eps_earnings_reports_asof_date,
Fundamentals.diluted_eps_earnings_reports],
mask=mask)
return last_close / diluted_eps
@staticmethod
def PriceToForwardEarnings(**kwargs):
"""Price to Forward Earnings"""
return Fundamentals.forward_pe_ratio.latest
@staticmethod
def DividendYield(**kwargs):
"""Dividends per share divided by closing price"""
return Fundamentals.trailing_dividend_yield.latest
@staticmethod
def PriceToFCF(mask):
"""Price to Free Cash Flow"""
last_close = USEquityPricing.close.latest
fcf_share = AnnualizedData(inputs = [Fundamentals.fcf_per_share_asof_date,
Fundamentals.fcf_per_share],
mask=mask)
return last_close / fcf_share
@staticmethod
def PriceToOperatingCashflow(mask):
"""Last Close divided by Operating Cash Flows"""
last_close = USEquityPricing.close.latest
cfo_per_share = AnnualizedData(inputs = [Fundamentals.cfo_per_share_asof_date,
Fundamentals.cfo_per_share],
mask=mask)
return last_close / cfo_per_share
@staticmethod
def PriceToBook(mask):
"""Closing price divided by book value"""
last_close = USEquityPricing.close.latest
book_value_per_share = AnnualizedData(inputs = [Fundamentals.book_value_per_share_asof_date,
Fundamentals.book_value_per_share],
mask=mask)
return last_close / book_value_per_share
@staticmethod
def EVToFCF(mask):
"""Enterprise Value divided by Free Cash Flows"""
fcf = AnnualizedData(inputs = [Fundamentals.free_cash_flow_asof_date,
Fundamentals.free_cash_flow],
mask=mask)
return Fundamentals.enterprise_value.latest / fcf
@staticmethod
def EVToEBITDA(mask):
"""Enterprise Value to Earnings Before Interest, Taxes, Deprecation and Amortization (EBITDA)"""
ebitda = AnnualizedData(inputs = [Fundamentals.ebitda_asof_date,
Fundamentals.ebitda],
mask=mask)
return Fundamentals.enterprise_value.latest / ebitda
@staticmethod
def EBITDAYield(mask):
"""EBITDA divided by latest close"""
ebitda = AnnualizedData(inputs = [Fundamentals.ebitda_asof_date,
Fundamentals.ebitda],
mask=mask)
return USEquityPricing.close.latest / ebitda
VALUE_FACTORS = {
'DividendYield' : ValueFactors.DividendYield,
'EBITDAYield' : ValueFactors.EBITDAYield,
'EVToEBITDA' : ValueFactors.EVToEBITDA,
'EVToFCF' : ValueFactors.EVToFCF,
'PriceToBook' : ValueFactors.PriceToBook,
'PriceToDilutedEarningsTTM': ValueFactors.PriceToDilutedEarningsTTM,
'PriceToEarningsTTM' : ValueFactors.PriceToEarningsTTM,
'PriceToFCF' : ValueFactors.PriceToFCF,
'PriceToForwardEarnings' : ValueFactors.PriceToForwardEarnings,
'PriceToOperatingCashflow' : ValueFactors.PriceToOperatingCashflow,
'PriceToSalesTTM' : ValueFactors.PriceToSalesTTM,
}
value_factors, t = factor_pipeline(VALUE_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
value_factors.info()
class MomentumFactors:
"""Custom Momentum Factors"""
class PercentAboveLow(CustomFactor):
"""Percentage of current close above low
in lookback window of window_length days
"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
out[:] = close[-1] / np.min(close, axis=0) - 1
class PercentBelowHigh(CustomFactor):
"""Percentage of current close below high
in lookback window of window_length days
"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
out[:] = close[-1] / np.max(close, axis=0) - 1
@staticmethod
def make_dx(timeperiod=14):
class DX(CustomFactor):
"""Directional Movement Index"""
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close]
window_length = timeperiod + 1
def compute(self, today, assets, out, high, low, close):
out[:] = [talib.DX(high[:, i],
low[:, i],
close[:, i],
timeperiod=timeperiod)[-1]
for i in range(len(assets))]
return DX
@staticmethod
def make_mfi(timeperiod=14):
class MFI(CustomFactor):
"""Money Flow Index"""
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close,
USEquityPricing.volume]
window_length = timeperiod + 1
def compute(self, today, assets, out, high, low, close, vol):
out[:] = [talib.MFI(high[:, i],
low[:, i],
close[:, i],
vol[:, i],
timeperiod=timeperiod)[-1]
for i in range(len(assets))]
return MFI
@staticmethod
def make_oscillator(fastperiod=12, slowperiod=26, matype=0):
class PPO(CustomFactor):
"""12/26-Day Percent Price Oscillator"""
inputs = [USEquityPricing.close]
window_length = slowperiod
def compute(self, today, assets, out, close_prices):
out[:] = [talib.PPO(close,
fastperiod=fastperiod,
slowperiod=slowperiod,
matype=matype)[-1]
for close in close_prices.T]
return PPO
@staticmethod
def make_stochastic_oscillator(fastk_period=5, slowk_period=3, slowd_period=3,
slowk_matype=0, slowd_matype=0):
class StochasticOscillator(CustomFactor):
"""20-day Stochastic Oscillator """
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close]
outputs = ['slowk', 'slowd']
window_length = fastk_period * 2
def compute(self, today, assets, out, high, low, close):
slowk, slowd = [talib.STOCH(high[:, i],
low[:, i],
close[:, i],
fastk_period=fastk_period,
slowk_period=slowk_period,
slowk_matype=slowk_matype,
slowd_period=slowd_period,
slowd_matype=slowd_matype)[-1]
for i in range(len(assets))]
out.slowk[:], out.slowd[:] = slowk[-1], slowd[-1]
return StochasticOscillator
@staticmethod
def make_trendline(timeperiod=252):
class Trendline(CustomFactor):
inputs = [USEquityPricing.close]
"""52-Week Trendline"""
window_length = timeperiod
def compute(self, today, assets, out, close_prices):
out[:] = [talib.LINEARREG_SLOPE(close,
timeperiod=timeperiod)[-1]
for close in close_prices.T]
return Trendline
MOMENTUM_FACTORS = {
'Percent Above Low' : MomentumFactors.PercentAboveLow,
'Percent Below High' : MomentumFactors.PercentBelowHigh,
'Price Oscillator' : MomentumFactors.make_oscillator(),
'Money Flow Index' : MomentumFactors.make_mfi(),
'Directional Movement Index' : MomentumFactors.make_dx(),
'Trendline' : MomentumFactors.make_trendline()
}
momentum_factors, t = factor_pipeline(MOMENTUM_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
momentum_factors.info()
class EfficiencyFactors:
@staticmethod
def CapexToAssets(mask):
"""Capital Expenditure divided by Total Assets"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
assets = Fundamentals.total_assets.latest
return - capex / assets
@staticmethod
def CapexToSales(mask):
"""Capital Expenditure divided by Total Revenue"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
revenue = AnnualizedData(inputs = [Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue],
mask=mask)
return - capex / revenue
@staticmethod
def CapexToFCF(mask):
"""Capital Expenditure divided by Free Cash Flows"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
free_cash_flow = AnnualizedData(inputs = [Fundamentals.free_cash_flow_asof_date,
Fundamentals.free_cash_flow],
mask=mask)
return - capex / free_cash_flow
@staticmethod
def EBITToAssets(mask):
"""Earnings Before Interest and Taxes (EBIT) divided by Total Assets"""
ebit = AnnualizedData(inputs = [Fundamentals.ebit_asof_date,
Fundamentals.ebit],
mask=mask)
assets = Fundamentals.total_assets.latest
return ebit / assets
@staticmethod
def CFOToAssets(mask):
"""Operating Cash Flows divided by Total Assets"""
cfo = AnnualizedData(inputs = [Fundamentals.operating_cash_flow_asof_date,
Fundamentals.operating_cash_flow],
mask=mask)
assets = Fundamentals.total_assets.latest
return cfo / assets
@staticmethod
def RetainedEarningsToAssets(mask):
"""Retained Earnings divided by Total Assets"""
retained_earnings = AnnualizedData(inputs = [Fundamentals.retained_earnings_asof_date,
Fundamentals.retained_earnings],
mask=mask)
assets = Fundamentals.total_assets.latest
return retained_earnings / assets
EFFICIENCY_FACTORS = {
'CFO To Assets' :EfficiencyFactors.CFOToAssets,
'Capex To Assets' :EfficiencyFactors.CapexToAssets,
'Capex To FCF' :EfficiencyFactors.CapexToFCF,
'Capex To Sales' :EfficiencyFactors.CapexToSales,
'EBIT To Assets' :EfficiencyFactors.EBITToAssets,
'Retained Earnings To Assets' :EfficiencyFactors.RetainedEarningsToAssets
}
efficiency_factors, t = factor_pipeline(EFFICIENCY_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
efficiency_factors.info()
class RiskFactors:
@staticmethod
def LogMarketCap(mask):
"""Log of Market Capitalization log(Close Price * Shares Outstanding)"""
return np.log(MarketCap(mask=mask))
class DownsideRisk(CustomFactor):
"""Mean returns divided by std of 1yr daily losses (Sortino Ratio)"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
ret = pd.DataFrame(close).pct_change()
out[:] = ret.mean().div(ret.where(ret<0).std())
@staticmethod
def MarketBeta(**kwargs):
"""Slope of 1-yr regression of price returns against index returns"""
return SimpleBeta(target=symbols('SPY'), regression_length=252)
class DownsideBeta(CustomFactor):
"""Slope of 1yr regression of returns on negative index returns"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
t = len(close)
assets = pd.DataFrame(close).pct_change()
start_date = (today - pd.DateOffset(years=1)).strftime('%Y-%m-%d')
spy = get_pricing('SPY',
start_date=start_date,
end_date=today.strftime('%Y-%m-%d')).reset_index(drop=True)
spy_neg_ret = (spy
.close_price
.iloc[-t:]
.pct_change()
.pipe(lambda x: x.where(x<0)))
out[:] = assets.apply(lambda x: x.cov(spy_neg_ret)).div(spy_neg_ret.var())
class Vol3M(CustomFactor):
"""3-month Volatility: Standard deviation of returns over 3 months"""
inputs = [USEquityPricing.close]
window_length = 63
def compute(self, today, assets, out, close):
out[:] = np.log1p(pd.DataFrame(close).pct_change()).std()
RISK_FACTORS = {
'Log Market Cap' : RiskFactors.LogMarketCap,
'Downside Risk' : RiskFactors.DownsideRisk,
'Index Beta' : RiskFactors.MarketBeta,
#'Downside Beta' : RiskFactors.DownsideBeta,
'Volatility 3M' : RiskFactors.Vol3M,
}
risk_factors, t = factor_pipeline(RISK_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
risk_factors.info()
def growth_pipeline():
revenue = AnnualizedData(inputs = [Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue],
mask=UNIVERSE)
eps = AnnualizedData(inputs = [Fundamentals.diluted_eps_earnings_reports_asof_date,
Fundamentals.diluted_eps_earnings_reports],
mask=UNIVERSE)
return Pipeline({'Sales': revenue,
'EPS': eps,
'Total Assets': Fundamentals.total_assets.latest,
'Net Debt': Fundamentals.net_debt.latest},
screen=UNIVERSE)
start_timer = time()
growth_factors = run_pipeline(growth_pipeline(), start_date=START, end_date=END)
for col in growth_factors.columns:
for month in [3, 12]:
new_col = col + ' Growth {}M'.format(month)
kwargs = {new_col: growth_factors[col].pct_change(month*MONTH).groupby(level=1).rank()}
growth_factors = growth_factors.assign(**kwargs)
print('Pipeline run time {:.2f} secs'.format(time() - start_timer))
growth_factors.info()
class QualityFactors:
@staticmethod
def AssetTurnover(mask):
"""Sales divided by average of year beginning and year end assets"""
assets = AnnualAvg(inputs=[Fundamentals.total_assets],
mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return sales / assets
@staticmethod
def CurrentRatio(mask):
"""Total current assets divided by total current liabilities"""
assets = Fundamentals.current_assets.latest
liabilities = Fundamentals.current_liabilities.latest
return assets / liabilities
@staticmethod
def AssetToEquityRatio(mask):
"""Total current assets divided by common equity"""
assets = Fundamentals.current_assets.latest
equity = Fundamentals.common_stock.latest
return assets / equity
@staticmethod
def InterestCoverage(mask):
"""EBIT divided by interest expense"""
ebit = AnnualizedData(inputs = [Fundamentals.ebit_asof_date,
Fundamentals.ebit], mask=mask)
interest_expense = AnnualizedData(inputs = [Fundamentals.interest_expense_asof_date,
Fundamentals.interest_expense], mask=mask)
return ebit / interest_expense
@staticmethod
def DebtToAssetRatio(mask):
"""Total Debts divided by Total Assets"""
debt = Fundamentals.total_debt.latest
assets = Fundamentals.total_assets.latest
return debt / assets
@staticmethod
def DebtToEquityRatio(mask):
"""Total Debts divided by Common Stock Equity"""
debt = Fundamentals.total_debt.latest
equity = Fundamentals.common_stock.latest
return debt / equity
@staticmethod
def WorkingCapitalToAssets(mask):
"""Current Assets less Current liabilities (Working Capital) divided by Assets"""
working_capital = Fundamentals.working_capital.latest
assets = Fundamentals.total_assets.latest
return working_capital / assets
@staticmethod
def WorkingCapitalToSales(mask):
"""Current Assets less Current liabilities (Working Capital), divided by Sales"""
working_capital = Fundamentals.working_capital.latest
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return working_capital / sales
class MertonsDD(CustomFactor):
"""Merton's Distance to Default """
inputs = [Fundamentals.total_assets,
Fundamentals.total_liabilities,
libor.value,
USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, tot_assets, tot_liabilities, r, close):
mertons = []
for col_assets, col_liabilities, col_r, col_close in zip(tot_assets.T, tot_liabilities.T,
r.T, close.T):
vol_1y = np.nanstd(col_close)
numerator = np.log(
col_assets[-1] / col_liabilities[-1]) + ((252 * col_r[-1]) - ((vol_1y ** 2) / 2))
mertons.append(numerator / vol_1y)
out[:] = mertons
QUALITY_FACTORS = {
'AssetToEquityRatio' : QualityFactors.AssetToEquityRatio,
'AssetTurnover' : QualityFactors.AssetTurnover,
'CurrentRatio' : QualityFactors.CurrentRatio,
'DebtToAssetRatio' : QualityFactors.DebtToAssetRatio,
'DebtToEquityRatio' : QualityFactors.DebtToEquityRatio,
'InterestCoverage' : QualityFactors.InterestCoverage,
'MertonsDD' : QualityFactors.MertonsDD,
'WorkingCapitalToAssets': QualityFactors.WorkingCapitalToAssets,
'WorkingCapitalToSales' : QualityFactors.WorkingCapitalToSales,
}
quality_factors, t = factor_pipeline(QUALITY_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
quality_factors.info()
class PayoutFactors:
@staticmethod
def DividendPayoutRatio(mask):
"""Dividends Per Share divided by Earnings Per Share"""
dps = AnnualizedData(inputs = [Fundamentals.dividend_per_share_earnings_reports_asof_date,
Fundamentals.dividend_per_share_earnings_reports], mask=mask)
eps = AnnualizedData(inputs = [Fundamentals.basic_eps_earnings_reports_asof_date,
Fundamentals.basic_eps_earnings_reports], mask=mask)
return dps / eps
@staticmethod
def DividendGrowth(**kwargs):
"""Annualized percentage DPS change"""
return Fundamentals.dps_growth.latest
PAYOUT_FACTORS = {
'Dividend Payout Ratio': PayoutFactors.DividendPayoutRatio,
'Dividend Growth': PayoutFactors.DividendGrowth
}
payout_factors, t = factor_pipeline(PAYOUT_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
payout_factors.info()
class ProfitabilityFactors:
@staticmethod
def GrossProfitMargin(mask):
"""Gross Profit divided by Net Sales"""
gross_profit = AnnualizedData([Fundamentals.gross_profit_asof_date,
Fundamentals.gross_profit], mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return gross_profit / sales
@staticmethod
def NetIncomeMargin(mask):
"""Net income divided by Net Sales"""
net_income = AnnualizedData([Fundamentals.net_income_income_statement_asof_date,
Fundamentals.net_income_income_statement], mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return net_income / sales
PROFITABIILTY_FACTORS = {
'Gross Profit Margin': ProfitabilityFactors.GrossProfitMargin,
'Net Income Margin': ProfitabilityFactors.NetIncomeMargin,
'Return on Equity': Fundamentals.roe.latest,
'Return on Assets': Fundamentals.roa.latest,
'Return on Invested Capital': Fundamentals.roic.latest
}
profitability_factors, t = factor_pipeline(PAYOUT_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
payout_factors.info()
We will test predictions for various lookahead periods to identify the best holding periods that generate the best predictability, measured by the information coefficient.
More specifically, we compute returns for 1, 5, 10, 20 and 60 days using the built-in Returns function, resulting in over 25,000 observations for the universe of 100 stocks over two years (that include approximately 252 trading days each)
lookahead = [1, 5, 10, 20, 60]
returns = run_pipeline(Pipeline({'Returns{}D'.format(i): Returns(inputs=[USEquityPricing.close],
window_length=i+1, mask=UNIVERSE) for i in lookahead},
screen=UNIVERSE),
start_date=START,
end_date=END)
return_cols = ['Returns{}D'.format(i) for i in lookahead]
returns.info()
We will use over 50 features that cover a broad range of factors based on market, fundamental, and alternative data. The notebook also includes custom transformations to convert fundamental data that is typically available in quarterly reporting frequency to rolling annual totals or averages to avoid excessive season fluctuations.
Once the factors have been computed we combine them using pd.concat(), assign index names, and create a categorical variable that identifies the asset for each data point:
data = pd.concat([returns,
value_factors,
momentum_factors,
quality_factors,
payout_factors,
growth_factors,
efficiency_factors,
risk_factors], axis=1).sortlevel()
data.index.names = ['date', 'asset']
data['stock'] = data.index.get_level_values('asset').map(lambda x: x.asset_name)
data.info()
# Craete sorted dataframe of numeric_features with missing_count
missing_values0 = data.isnull().sum(axis=0).reset_index()
missing_values0.columns = ['column_name', 'missing_count']
missing_values0 = missing_values0.loc[missing_values0['missing_count']>0]
missing_values0 = missing_values0.sort_values(by='missing_count')
# Get percantage of total NaNs numeric_features
total0 = data.isnull().sum().sort_values(ascending=False)
percent0 = (data.isnull().sum()/data.isnull().count()).sort_values(ascending=False)
missing_data0 = pd.concat([total0, percent0], axis=1,join='outer', keys=['Total Missing Count', '% of Total Observations'])
missing_data0.index.name =' Numeric Feature'
missing_data0.head(len(data.columns))
ind0 = np.arange(missing_values0.shape[0])
width0 = 0.1
fig, ax = plt.subplots(figsize=(13,5))
colors0 = sns.color_palette('Set2', len(ind0))
rects0 = ax.bar(ind0, missing_values0.missing_count.values, color=colors0)
ax.set_xticks(ind0)
ax.set_xticklabels(missing_values0.column_name.values, rotation='vertical')
ax.set_ylabel("Count")
ax.set_title("Missing Observations Count")
ax.margins(0.001)
plt.show()
In a next step, we remove rows and columns that lack more than 20 percent of the observations, resulting in a loss of six percent of the observations and 5 columns:
rows_before, cols_before = data.shape
data = (data
.dropna(axis=1, thresh=int(len(data)*.8))
.dropna(thresh=int(len(data.columns) * .8)))
#data = data.fillna(data.median())
data = data.bfill().ffill()
rows_after, cols_after = data.shape
print('{:,d} rows and {:,d} columns dropped'.format(rows_before-rows_after, cols_before-cols_after))
At this point, we have 51 features and the categorical identifier of the stock:
data.sort_index(1).info()
First lets take a look at the individual distributions of all our data.
data.hist(bins=25, figsize=(22,22))
plt.show()
It is always a good idea to check the relationship between your features and target variable. Here we will look at a scatter plot of the 60 day target variable along with the p-value, r2 score and mean IC (information coefficient) for each feature.
tmp = data.drop(['Returns1D','Returns5D','Returns10D','Returns20D'], axis=1)
tmp.reset_index(level=['asset'], inplace=True, drop=True)
tmp.head()
def r2(x, y):
return pearsonr(x, y)[0] ** 2
count = 0
for i, feature in enumerate(list(tmp), 1):
count += 1
if(feature == 'Returns60D'):
print()
else:
print('{} # {}'.format(feature, count))
plt.figure(figsize=(8,5))
cm = plt.get_cmap('jet')
colors = np.linspace(0.1, 1, len(tmp))
sc = plt.scatter(tmp[feature], tmp['Returns60D'], s=25, c=colors, cmap=cm,
edgecolor='k', alpha=0.3, label='Price Data')
j = sns.regplot(tmp[feature], tmp['Returns60D'], data=tmp, scatter=False,
line_kws={'color':'k','lw':2, 'linestyle':'dashed'})
cb = plt.colorbar(sc)
cb.ax.set_yticklabels([str(p) for p in tmp[::len(tmp)//9].index],
fontdict = {'fontsize': 10,
'fontweight': 'medium'})
plt.xlabel('{}'.format(feature), size=10, labelpad=10, fontsize=10, fontweight='medium')
plt.ylabel('Returns60D', size=10, labelpad=10, fontsize=10, fontweight='medium')
plt.grid(False)
ic, pval = spearmanr(tmp[feature], tmp['Returns60D'])
R2 = r2(tmp[feature], tmp['Returns60D'])
plt.title('r2 = {}, IC = {}, P-Value = {}'.format(round(R2,4), round(ic,4), pval))
for j in range(2):
plt.tick_params(axis='x', labelsize=10)
plt.tick_params(axis='y', labelsize=10)
plt.show()
if(count == len(tmp.columns)-1):
break
For linear regression models, it is important to explore the correlation among the features to identify multicollinearity issues, and to check the correlation between the features and the target. The notebook contains a seaborn clustermap that shows the hierarchical structure of the feature correlation matrix. It identifies a small number of highly correlated clusters.
g = sns.clustermap(data.drop(['stock'] + return_cols, axis=1).corr(), square=True)
g.ax_heatmap.set_yticklabels(g.ax_heatmap.get_yticklabels(), rotation=0)
plt.title('Correlation of all_features',y=1, x=5,size=20)
plt.show();
We need to convert the categorical stock variable into a numeric format so that the linear regression can process it. For this purpose, we use dummy encoding that creates individual columns for each category level and flags the presence of this level in the original categorical column with an entry of 1, and 0 otherwise. The pandas function get_dummies() automates dummy encoding. It detects and properly converts columns of type objects as illustrated next. If you need dummy variables for columns containing integers, for instance, you can identify them using the keyword columns:
X = pd.get_dummies(data.drop(return_cols, axis=1), prefix_sep='_')
X.info()
The goal is to predict returns over a given holding period. Hence, we need to align the features with return values with the corresponding return data point 1, 5, 10, 20 or 60 days into the future for each equity. We achieve this by combining the pandas .groupby() method with the .shift() method as follows:
y = data.loc[:, return_cols]
shifted_y = []
for col in y.columns:
t = int(re.search(r'\d+', col).group(0))
shifted_y.append(y.groupby(level='asset')['Returns{}D'.format(t)].shift(-t).to_frame(col))
y = pd.concat(shifted_y, axis=1)
y.info()
ax = sns.boxplot(y[return_cols])
ax.set_title('Return Distriubtions');
We can estimate a linear regression model using OLS with statsmodels. We select a forward return, for example for a 1-day holding period, remove outliers below the 2.5% and above the 97.5% percentiles, and fit the model accordingly:
target = 'Returns1D'
model_data = pd.concat([y[[target]], X], axis=1).dropna()
model_data = model_data[model_data[target].between(model_data[target].quantile(.025),
model_data[target].quantile(.975))]
model = OLS(endog=model_data[target], exog=model_data.drop(target, axis=1))
trained_model = model.fit()
trained_model.summary()
The summary is available in the notebook to save some space due to the large number of variables. The diagnostic statistics show that, given the high p-value on the Jarque—Bera statistic, the hypothesis that the residuals are normally distributed cannot be rejected.
However, the Durbin—Watson statistic is low at 1.4 so we can reject the null hypothesis of no autocorrelation comfortably at the 5% level. Hence, the standard errors are likely positively correlated. If our goal were to understand which factors are significantly associated with forward returns, we would need to rerun the regression using robust standard errors (a parameter in statsmodels .fit() method), or use a different method altogether such as a panel model that allows for more complex error covariance.
target = 'Returns5D'
model_data = pd.concat([y[[target]], X], axis=1).dropna()
model_data = model_data[model_data[target].between(model_data[target].quantile(.025),
model_data[target].quantile(.975))]
model = OLS(endog=model_data[target], exog=model_data.drop(target, axis=1))
trained_model = model.fit()
trained_model.summary()
target = 'Returns10D'
model_data = pd.concat([y[[target]], X], axis=1).dropna()
model_data = model_data[model_data[target].between(model_data[target].quantile(.025),
model_data[target].quantile(.975))]
model = OLS(endog=model_data[target], exog=model_data.drop(target, axis=1))
trained_model = model.fit()
trained_model.summary()
target = 'Returns20D'
model_data = pd.concat([y[[target]], X], axis=1).dropna()
model_data = model_data[model_data[target].between(model_data[target].quantile(.025),
model_data[target].quantile(.975))]
model = OLS(endog=model_data[target], exog=model_data.drop(target, axis=1))
trained_model = model.fit()
trained_model.summary()
target = 'Returns60D'
model_data = pd.concat([y[[target]], X], axis=1).dropna()
model_data = model_data[model_data[target].between(model_data[target].quantile(.025),
model_data[target].quantile(.975))]
model = OLS(endog=model_data[target], exog=model_data.drop(target, axis=1))
trained_model = model.fit()
trained_model.summary()
Since sklearn is tailored towards prediction, we will evaluate the linear regression model based on its predictive performance using cross-validation.
Our data consists of grouped time series data that requires a custom cross-validation function to provide the train and test indices that ensure that the test data immediately follows the training data for each equity and we do not inadvertently create a look-ahead bias or leakage.
We can achieve this using the following function that returns a generator yielding pairs of train and test dates. The set of train dates that ensure a minimum length of the training periods. The number of pairs depends on the parameter nfolds. The distinct test periods do not overlap and are located at the end of the period available in the data. After a test period is used, it becomes part of the training data that grow in size accordingly:
def time_series_split(d=model_data, nfolds=5, min_train=21):
"""Generate train/test dates for nfolds
with at least min_train train obs
"""
train_dates = d[:min_train].tolist()
n = int(len(dates)/(nfolds + 1)) + 1
test_folds = [d[i:i + n] for i in range(min_train, len(d), n)]
for test_dates in test_folds:
if len(train_dates) > min_train:
yield train_dates, test_dates
train_dates.extend(test_dates)
We need to select the appropriate return series (we will use a 60-day holding period) and remove outliers. We will also convert returns to log returns as follows:
target = 'Returns60D'
outliers = .01
model_data = pd.concat([y[[target]], X], axis=1).dropna().reset_index('asset', drop=True)
model_data = model_data[model_data[target].between(*model_data[target].quantile([outliers, 1-outliers]).values)]
model_data[target] = np.log1p(model_data[target])
features = model_data.drop(target, axis=1).columns
dates = model_data.index.unique()
print(model_data.info())
model_data[target].describe()
idx = pd.IndexSlice
We will use 250 folds to generally predict about 2 days of forward returns following the historical training data that will gradually increase in length.
nfolds = 250
for train_dates, test_dates in time_series_split(dates, nfolds=nfolds):
print('{} train_dates from {} - {}'.format(len(train_dates), str(train_dates[0]),
str(train_dates[len(train_dates)-1])))
print('{} test_dates from {} - {}'.format(len(test_dates),str(test_dates[0]),
str(test_dates[len(test_dates)-1])))
print()
Each iteration obtains the appropriate training and test dates from our custom cross-validation function, selects the corresponding features and targets, and then trains and predicts accordingly.
We capture the root mean squared error as well as the Spearman rank correlation between actual and predicted values:
nfolds = 250
lr = LinearRegression()
test_results, result_idx, preds = [], [], pd.DataFrame()
for train_dates, test_dates in time_series_split(dates, nfolds=nfolds):
X_train = model_data.loc[idx[train_dates], features]
y_train = model_data.loc[idx[train_dates], target]
lr.fit(X=X_train, y=y_train)
X_test = model_data.loc[idx[test_dates], features]
y_test = model_data.loc[idx[test_dates], target]
y_pred = lr.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_pred=y_pred, y_true=y_test))
ic, pval = spearmanr(y_pred, y_test)
test_results.append([rmse, ic, pval])
preds = preds.append(y_test.to_frame('actuals').assign(predicted=y_pred))
result_idx.append(train_dates[-1])
test_result = pd.DataFrame(test_results, columns=['rmse', 'ic', 'pval'], index=result_idx)
We have captured the test predictions from the 250 folds and can compute both the overall and a 21-day rolling average:
fig, axes = plt.subplots(nrows=2)
rolling_result = test_result.rolling(21).mean().dropna()
rolling_result[['ic', 'pval']].plot(ax=axes[0], title='Information Coefficient')
axes[0].axhline(test_result.ic.mean(), lw=1, ls='--', color='k')
rolling_result[['rmse']].plot(ax=axes[1], title='Root Mean Squared Error')
axes[1].axhline(test_result.rmse.mean(), lw=1, ls='--', color='k')
plt.tight_layout();
For the entire period, we see that the Information Coefficient measured by the rank correlation of actual and predicted returns is positive and statistically significant:
preds_cleaned = preds[(preds.predicted.between(*preds.predicted.quantile([.001, .999]).values))]
sns.jointplot(x='actuals', y='predicted', data=preds_cleaned, stat_func=spearmanr, kind='reg');
For the ridge regression, we need to tune the regularization parameter with the keyword alpha that corresponds to the λ we used previously. We will try 21 values from 10-5 to 105 in logarithmic steps.
The scale sensitivity of the ridge penalty requires us to standardize the inputs using the StandardScaler. Note that we always learn the mean and the standard deviation from the training set using the .fit_transform() method and then apply these learned parameters to the test set using the .transform() method.
nfolds = 250
alphas = np.logspace(-5, 5, 11)
scaler = StandardScaler()
ridge_result, ridge_coeffs = pd.DataFrame(), pd.DataFrame()
for i, alpha in enumerate(alphas):
print alpha,
coeffs, test_results = [], []
lr_ridge = Ridge(alpha=alpha)
for train_dates, test_dates in time_series_split(dates, nfolds=nfolds):
X_train = model_data.loc[idx[train_dates], features]
y_train = model_data.loc[idx[train_dates], target]
lr_ridge.fit(X=scaler.fit_transform(X_train), y=y_train)
coeffs.append(lr_ridge.coef_)
X_test = model_data.loc[idx[test_dates], features]
y_test = model_data.loc[idx[test_dates], target]
y_pred = lr_ridge.predict(scaler.transform(X_test))
rmse = np.sqrt(mean_squared_error(y_pred=y_pred, y_true=y_test))
ic, pval = spearmanr(y_pred, y_test)
test_results.append([train_dates[-1], rmse, ic, pval, alpha])
preds = preds.append(y_test)
test_results = pd.DataFrame(test_results, columns=['date', 'rmse', 'ic', 'pval', 'alpha'])
ridge_result = ridge_result.append(test_results)
ridge_coeffs[alpha] = np.mean(coeffs, axis=0)
ridge_result.describe()
plt.figure(figsize=(8,5))
sns.distplot(ridge_result.pval, bins=30, norm_hist=True);
ridge_result_sig = ridge_result[(ridge_result.pval < .05) & (ridge_result.alpha.between(10**-5, 10**5))]
ridge_result_sig_alpha = ridge_result_sig.groupby('alpha')
ridge_coeffs_main = ridge_coeffs.filter(ridge_result_sig.alpha.unique())
We can now plot the information coefficient obtained for each hyperparameter value and also visualize how the coefficient values evolve as the regularization increases. The results show that we get the highest IC value for a value of λ=10. For this level of regularization, the right-hand panel reveals that the coefficients have been already significantly shrunk compared to the (almost) unconstrained model with λ=10-5:
ridge_result.info()
best_ic = ridge_result_sig_alpha['ic'].mean().max()
best_alpha = ridge_result_sig_alpha['ic'].mean().idxmax()
fig, axes = plt.subplots(ncols=2, sharex=True)
ridge_result.groupby('alpha')['ic'].mean().plot(logx=True, title='Information Coefficient', ax=axes[0])
axes[0].axhline(ridge_result.groupby('alpha').ic.mean().median())
axes[0].axvline(x=ridge_result.groupby('alpha').ic.mean().idxmax(), c='darkgrey', ls='--')
axes[0].set_xlabel('Regularization')
axes[0].set_ylabel('Information Coefficient')
ridge_coeffs_main.T.plot(legend=False, logx=True, title='Ridge Path', ax=axes[1])
axes[1].set_xlabel('Regularization')
axes[1].set_ylabel('Coefficients')
axes[1].axvline(x=ridge_result.groupby('alpha').ic.mean().idxmax(), c='darkgrey', ls='--')
fig.tight_layout();
The standardization of the coefficients allows us to draw conclusions about their relative importance by comparing their absolute magnitude. The most relevant coefficients are:
model_coeffs = ridge_coeffs_main.loc[:, best_alpha]
model_coeffs.index = features
model_coeffs.abs().sort_values().plot.barh(title='Top Factors', figsize=(10,23));
ax = sns.boxplot(y='ic', x='alpha', data=ridge_result_sig)
plt.xticks(rotation=90);
The lasso implementation looks very similar to the ridge model we just ran. The main difference is that lasso needs to arrive at a solution using iterative coordinate descent whereas ridge can rely on a closed-form solution:
nfolds = 250
alphas = np.logspace(-8, -2, 13)
scaler = StandardScaler()
lasso_results, lasso_coeffs = pd.DataFrame(), pd.DataFrame()
for i, alpha in enumerate(alphas):
print i,
coeffs, test_results = [], []
lr_lasso = Lasso(alpha=alpha)
for i, (train_dates, test_dates) in enumerate(time_series_split(dates, nfolds=nfolds)):
X_train = model_data.loc[idx[train_dates], features]
y_train = model_data.loc[idx[train_dates], target]
lr_lasso.fit(X=scaler.fit_transform(X_train), y=y_train)
X_test = model_data.loc[idx[test_dates], features]
y_test = model_data.loc[idx[test_dates], target]
y_pred = lr_lasso.predict(scaler.transform(X_test))
rmse = np.sqrt(mean_squared_error(y_pred=y_pred, y_true=y_test))
ic, pval = spearmanr(y_pred, y_test)
coeffs.append(lr_lasso.coef_)
test_results.append([train_dates[-1], rmse, ic, pval, alpha])
test_results = pd.DataFrame(test_results, columns=['date', 'rmse', 'ic', 'pval', 'alpha'])
lasso_results = lasso_results.append(test_results)
lasso_coeffs[alpha] = np.mean(coeffs, axis=0)
lasso_results.groupby('alpha').mean()
ax = sns.boxplot(y='ic', x='alpha', data=lasso_results)
plt.xticks(rotation=90);
As before, we can plot the average information coefficient for all test sets used during cross-validation. We see again that regularization improves the IC over the unconstrained model, delivering the best out-of-sample result at a level of λ=10-5. The optimal regularization value is quite different from ridge regression because the penalty consists of the sum of the absolute, not the squared values of the relatively small coefficient values. We can also see that for this regularization level, the coefficients have been similarly shrunk, as in the ridge regression case:
fig, axes = plt.subplots(ncols=2, sharex=True)
lasso_results.groupby('alpha')['ic'].mean().plot(logx=True, title='Information Coefficient', ax=axes[0])
axes[0].axhline(lasso_results.groupby('alpha')['ic'].mean().median())
axes[0].axvline(x=lasso_results.groupby('alpha')['ic'].mean().idxmax(), c='darkgrey', ls='--')
axes[0].set_xlabel('Regularization')
axes[0].set_ylabel('Information Coefficient')
lasso_coeffs.T.plot(legend=False, logx=True, title='Lasso Path', ax=axes[1])
axes[1].set_xlabel('Regularization')
axes[1].set_ylabel('Coefficients')
axes[1].axvline(x=lasso_results.groupby('alpha')['ic'].mean().idxmax(), c='darkgrey', ls='--')
fig.tight_layout();
In sum, ridge and lasso will produce similar results. Ridge often computes faster, but lasso also yields continuous features subset selection by gradually reducing coefficients to zero, hence eliminating features.