The notebook linear_regression.ipynb contains examples for the prediction of stock prices using OLS with statsmodels and sklearn, as well as ridge and lasso models.
It is designed to run as a notebook on the Quantopian research platform.
This notebook is written for the Quantopian research environment.
import pandas as pd
import numpy as np
from time import time
import talib
import re
from statsmodels.api import OLS
from sklearn.metrics import mean_squared_error
from scipy.stats import spearmanr, pearsonr
from sklearn.linear_model import LinearRegression, Ridge, RidgeCV, Lasso, LassoCV, LogisticRegression
from sklearn.preprocessing import StandardScaler
from quantopian.research import run_pipeline
from quantopian.pipeline import Pipeline, factors, filters, classifiers
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import (Latest,
Returns,
AverageDollarVolume,
SimpleMovingAverage,
EWMA,
BollingerBands,
CustomFactor,
MarketCap,
SimpleBeta)
from quantopian.pipeline.filters import QTradableStocksUS, StaticAssets
from quantopian.pipeline.data.quandl import fred_usdontd156n as libor
from empyrical import max_drawdown, sortino_ratio
import seaborn as sns
import matplotlib.pyplot as plt
################
# Fundamentals #
################
# Morningstar fundamentals (2002 - Ongoing)
# https://www.quantopian.com/help/fundamentals
from quantopian.pipeline.data import Fundamentals
#####################
# Analyst Estimates #
#####################
# Earnings Surprises - Zacks (27 May 2006 - Ongoing)
# https://www.quantopian.com/data/zacks/earnings_surprises
from quantopian.pipeline.data.zacks import EarningsSurprises
from quantopian.pipeline.factors.zacks import BusinessDaysSinceEarningsSurprisesAnnouncement
##########
# Events #
##########
# Buyback Announcements - EventVestor (01 Jun 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/buyback_auth
from quantopian.pipeline.data.eventvestor import BuybackAuthorizations
from quantopian.pipeline.factors.eventvestor import BusinessDaysSinceBuybackAuth
# CEO Changes - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/ceo_change
from quantopian.pipeline.data.eventvestor import CEOChangeAnnouncements
# Dividends - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/dividends
from quantopian.pipeline.data.eventvestor import (
DividendsByExDate,
DividendsByPayDate,
DividendsByAnnouncementDate,
)
from quantopian.pipeline.factors.eventvestor import (
BusinessDaysSincePreviousExDate,
BusinessDaysUntilNextExDate,
BusinessDaysSinceDividendAnnouncement,
)
# Earnings Calendar - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/earnings_calendar
from quantopian.pipeline.data.eventvestor import EarningsCalendar
from quantopian.pipeline.factors.eventvestor import (
BusinessDaysUntilNextEarnings,
BusinessDaysSincePreviousEarnings
)
# 13D Filings - EventVestor (01 Jan 2007 - Ongoing)
# https://www.quantopian.com/data/eventvestor/_13d_filings
from quantopian.pipeline.data.eventvestor import _13DFilings
from quantopian.pipeline.factors.eventvestor import BusinessDaysSince13DFilingsDate
#############
# Sentiment #
#############
# News Sentiment - Sentdex Sentiment Analysis (15 Oct 2012 - Ongoing)
# https://www.quantopian.com/data/sentdex/sentiment
from quantopian.pipeline.data.sentdex import sentiment
We need to select a universe of equities and a time horizon, build and transform alpha factors that we will use as features, calculate forward returns that we aim to predict, and potentially clean our data.
# trading days per period
MONTH = 21
YEAR = 12 * MONTH
START = '2017-01-01'
END = '2018-12-31'
We will use equity data for the years 2014 and 2015 from a custom Q50US universe that uses built-in filters, factors, and classifiers to select the 50 stocks with the highest average dollar volume of the last 200 trading days filtered by additional default criteria (see Quantopian docs linked on GitHub for detail). The universe dynamically updates based on the filter criteria so that, while there are 100 stocks at any given point, there may be more than 50 distinct equities in the sample:
def Q50US():
return filters.make_us_equity_universe(
target_size=50,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=classifiers.fundamentals.Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
# UNIVERSE = StaticAssets(symbols(['MSFT', 'AAPL']))
UNIVERSE = Q50US()
class AnnualizedData(CustomFactor):
# Get the sum of the last 4 reported values
window_length = 260
def compute(self, today, assets, out, asof_date, values):
for asset in range(len(assets)):
# unique asof dates indicate availability of new figures
_, filing_dates = np.unique(asof_date[:, asset], return_index=True)
quarterly_values = values[filing_dates[-4:], asset]
# ignore annual windows with <4 quarterly data points
if len(~np.isnan(quarterly_values)) != 4:
out[asset] = np.nan
else:
out[asset] = np.sum(quarterly_values)
class AnnualAvg(CustomFactor):
window_length = 252
def compute(self, today, assets, out, values):
out[:] = (values[0] + values[-1])/2
def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
chunks = []
current = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
step = pd.Timedelta(weeks=26) if chunks_len is None else chunks_len
start_pipeline_timer = time()
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
start_timer = time()
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)
end_timer = time()
print "Time to run this chunk of the pipeline %.2f secs" % (end_timer - start_timer)
end_pipeline_timer = time()
print "Time to run the entire pipeline %.2f secs" % (end_pipeline_timer - start_pipeline_timer)
return pd.concat(chunks)
def factor_pipeline(factors):
start = time()
pipe = Pipeline({k: v(mask=UNIVERSE).rank() for k, v in factors.items()},
screen=UNIVERSE)
result = run_pipeline_chunks(pipe, start_date=START, end_date=END)
return result, time() - start
class ValueFactors:
"""Definitions of factors for cross-sectional trading algorithms"""
@staticmethod
def PriceToSalesTTM(**kwargs):
"""Last closing price divided by sales per share"""
return Fundamentals.ps_ratio.latest
@staticmethod
def PriceToEarningsTTM(**kwargs):
"""Closing price divided by earnings per share (EPS)"""
return Fundamentals.pe_ratio.latest
@staticmethod
def PriceToDilutedEarningsTTM(mask):
"""Closing price divided by diluted EPS"""
last_close = USEquityPricing.close.latest
diluted_eps = AnnualizedData(inputs = [Fundamentals.diluted_eps_earnings_reports_asof_date,
Fundamentals.diluted_eps_earnings_reports],
mask=mask)
return last_close / diluted_eps
@staticmethod
def PriceToForwardEarnings(**kwargs):
"""Price to Forward Earnings"""
return Fundamentals.forward_pe_ratio.latest
@staticmethod
def DividendYield(**kwargs):
"""Dividends per share divided by closing price"""
return Fundamentals.trailing_dividend_yield.latest
@staticmethod
def PriceToFCF(mask):
"""Price to Free Cash Flow"""
last_close = USEquityPricing.close.latest
fcf_share = AnnualizedData(inputs = [Fundamentals.fcf_per_share_asof_date,
Fundamentals.fcf_per_share],
mask=mask)
return last_close / fcf_share
@staticmethod
def PriceToOperatingCashflow(mask):
"""Last Close divided by Operating Cash Flows"""
last_close = USEquityPricing.close.latest
cfo_per_share = AnnualizedData(inputs = [Fundamentals.cfo_per_share_asof_date,
Fundamentals.cfo_per_share],
mask=mask)
return last_close / cfo_per_share
@staticmethod
def PriceToBook(mask):
"""Closing price divided by book value"""
last_close = USEquityPricing.close.latest
book_value_per_share = AnnualizedData(inputs = [Fundamentals.book_value_per_share_asof_date,
Fundamentals.book_value_per_share],
mask=mask)
return last_close / book_value_per_share
@staticmethod
def EVToFCF(mask):
"""Enterprise Value divided by Free Cash Flows"""
fcf = AnnualizedData(inputs = [Fundamentals.free_cash_flow_asof_date,
Fundamentals.free_cash_flow],
mask=mask)
return Fundamentals.enterprise_value.latest / fcf
@staticmethod
def EVToEBITDA(mask):
"""Enterprise Value to Earnings Before Interest, Taxes, Deprecation and Amortization (EBITDA)"""
ebitda = AnnualizedData(inputs = [Fundamentals.ebitda_asof_date,
Fundamentals.ebitda],
mask=mask)
return Fundamentals.enterprise_value.latest / ebitda
@staticmethod
def EBITDAYield(mask):
"""EBITDA divided by latest close"""
ebitda = AnnualizedData(inputs = [Fundamentals.ebitda_asof_date,
Fundamentals.ebitda],
mask=mask)
return USEquityPricing.close.latest / ebitda
VALUE_FACTORS = {
'DividendYield' : ValueFactors.DividendYield,
'EBITDAYield' : ValueFactors.EBITDAYield,
'EVToEBITDA' : ValueFactors.EVToEBITDA,
'EVToFCF' : ValueFactors.EVToFCF,
'PriceToBook' : ValueFactors.PriceToBook,
'PriceToDilutedEarningsTTM': ValueFactors.PriceToDilutedEarningsTTM,
'PriceToEarningsTTM' : ValueFactors.PriceToEarningsTTM,
'PriceToFCF' : ValueFactors.PriceToFCF,
'PriceToForwardEarnings' : ValueFactors.PriceToForwardEarnings,
'PriceToOperatingCashflow' : ValueFactors.PriceToOperatingCashflow,
'PriceToSalesTTM' : ValueFactors.PriceToSalesTTM,
}
value_factors, t = factor_pipeline(VALUE_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
value_factors.info()
class MomentumFactors:
"""Custom Momentum Factors"""
class PercentAboveLow(CustomFactor):
"""Percentage of current close above low
in lookback window of window_length days
"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
out[:] = close[-1] / np.min(close, axis=0) - 1
class PercentBelowHigh(CustomFactor):
"""Percentage of current close below high
in lookback window of window_length days
"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
out[:] = close[-1] / np.max(close, axis=0) - 1
@staticmethod
def make_dx(timeperiod=14):
class DX(CustomFactor):
"""Directional Movement Index"""
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close]
window_length = timeperiod + 1
def compute(self, today, assets, out, high, low, close):
out[:] = [talib.DX(high[:, i],
low[:, i],
close[:, i],
timeperiod=timeperiod)[-1]
for i in range(len(assets))]
return DX
@staticmethod
def make_mfi(timeperiod=14):
class MFI(CustomFactor):
"""Money Flow Index"""
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close,
USEquityPricing.volume]
window_length = timeperiod + 1
def compute(self, today, assets, out, high, low, close, vol):
out[:] = [talib.MFI(high[:, i],
low[:, i],
close[:, i],
vol[:, i],
timeperiod=timeperiod)[-1]
for i in range(len(assets))]
return MFI
@staticmethod
def make_oscillator(fastperiod=12, slowperiod=26, matype=0):
class PPO(CustomFactor):
"""12/26-Day Percent Price Oscillator"""
inputs = [USEquityPricing.close]
window_length = slowperiod
def compute(self, today, assets, out, close_prices):
out[:] = [talib.PPO(close,
fastperiod=fastperiod,
slowperiod=slowperiod,
matype=matype)[-1]
for close in close_prices.T]
return PPO
@staticmethod
def make_stochastic_oscillator(fastk_period=5, slowk_period=3, slowd_period=3,
slowk_matype=0, slowd_matype=0):
class StochasticOscillator(CustomFactor):
"""20-day Stochastic Oscillator """
inputs = [USEquityPricing.high,
USEquityPricing.low,
USEquityPricing.close]
outputs = ['slowk', 'slowd']
window_length = fastk_period * 2
def compute(self, today, assets, out, high, low, close):
slowk, slowd = [talib.STOCH(high[:, i],
low[:, i],
close[:, i],
fastk_period=fastk_period,
slowk_period=slowk_period,
slowk_matype=slowk_matype,
slowd_period=slowd_period,
slowd_matype=slowd_matype)[-1]
for i in range(len(assets))]
out.slowk[:], out.slowd[:] = slowk[-1], slowd[-1]
return StochasticOscillator
@staticmethod
def make_trendline(timeperiod=252):
class Trendline(CustomFactor):
inputs = [USEquityPricing.close]
"""52-Week Trendline"""
window_length = timeperiod
def compute(self, today, assets, out, close_prices):
out[:] = [talib.LINEARREG_SLOPE(close,
timeperiod=timeperiod)[-1]
for close in close_prices.T]
return Trendline
MOMENTUM_FACTORS = {
'Percent Above Low' : MomentumFactors.PercentAboveLow,
'Percent Below High' : MomentumFactors.PercentBelowHigh,
'Price Oscillator' : MomentumFactors.make_oscillator(),
'Money Flow Index' : MomentumFactors.make_mfi(),
'Directional Movement Index' : MomentumFactors.make_dx(),
'Trendline' : MomentumFactors.make_trendline()
}
momentum_factors, t = factor_pipeline(MOMENTUM_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
momentum_factors.info()
class EfficiencyFactors:
@staticmethod
def CapexToAssets(mask):
"""Capital Expenditure divided by Total Assets"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
assets = Fundamentals.total_assets.latest
return - capex / assets
@staticmethod
def CapexToSales(mask):
"""Capital Expenditure divided by Total Revenue"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
revenue = AnnualizedData(inputs = [Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue],
mask=mask)
return - capex / revenue
@staticmethod
def CapexToFCF(mask):
"""Capital Expenditure divided by Free Cash Flows"""
capex = AnnualizedData(inputs = [Fundamentals.capital_expenditure_asof_date,
Fundamentals.capital_expenditure],
mask=mask)
free_cash_flow = AnnualizedData(inputs = [Fundamentals.free_cash_flow_asof_date,
Fundamentals.free_cash_flow],
mask=mask)
return - capex / free_cash_flow
@staticmethod
def EBITToAssets(mask):
"""Earnings Before Interest and Taxes (EBIT) divided by Total Assets"""
ebit = AnnualizedData(inputs = [Fundamentals.ebit_asof_date,
Fundamentals.ebit],
mask=mask)
assets = Fundamentals.total_assets.latest
return ebit / assets
@staticmethod
def CFOToAssets(mask):
"""Operating Cash Flows divided by Total Assets"""
cfo = AnnualizedData(inputs = [Fundamentals.operating_cash_flow_asof_date,
Fundamentals.operating_cash_flow],
mask=mask)
assets = Fundamentals.total_assets.latest
return cfo / assets
@staticmethod
def RetainedEarningsToAssets(mask):
"""Retained Earnings divided by Total Assets"""
retained_earnings = AnnualizedData(inputs = [Fundamentals.retained_earnings_asof_date,
Fundamentals.retained_earnings],
mask=mask)
assets = Fundamentals.total_assets.latest
return retained_earnings / assets
EFFICIENCY_FACTORS = {
'CFO To Assets' :EfficiencyFactors.CFOToAssets,
'Capex To Assets' :EfficiencyFactors.CapexToAssets,
'Capex To FCF' :EfficiencyFactors.CapexToFCF,
'Capex To Sales' :EfficiencyFactors.CapexToSales,
'EBIT To Assets' :EfficiencyFactors.EBITToAssets,
'Retained Earnings To Assets' :EfficiencyFactors.RetainedEarningsToAssets
}
efficiency_factors, t = factor_pipeline(EFFICIENCY_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
efficiency_factors.info()
class RiskFactors:
@staticmethod
def LogMarketCap(mask):
"""Log of Market Capitalization log(Close Price * Shares Outstanding)"""
return np.log(MarketCap(mask=mask))
class DownsideRisk(CustomFactor):
"""Mean returns divided by std of 1yr daily losses (Sortino Ratio)"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
ret = pd.DataFrame(close).pct_change()
out[:] = ret.mean().div(ret.where(ret<0).std())
@staticmethod
def MarketBeta(**kwargs):
"""Slope of 1-yr regression of price returns against index returns"""
return SimpleBeta(target=symbols('SPY'), regression_length=252)
class DownsideBeta(CustomFactor):
"""Slope of 1yr regression of returns on negative index returns"""
inputs = [USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, close):
t = len(close)
assets = pd.DataFrame(close).pct_change()
start_date = (today - pd.DateOffset(years=1)).strftime('%Y-%m-%d')
spy = get_pricing('SPY',
start_date=start_date,
end_date=today.strftime('%Y-%m-%d')).reset_index(drop=True)
spy_neg_ret = (spy
.close_price
.iloc[-t:]
.pct_change()
.pipe(lambda x: x.where(x<0)))
out[:] = assets.apply(lambda x: x.cov(spy_neg_ret)).div(spy_neg_ret.var())
class Vol3M(CustomFactor):
"""3-month Volatility: Standard deviation of returns over 3 months"""
inputs = [USEquityPricing.close]
window_length = 63
def compute(self, today, assets, out, close):
out[:] = np.log1p(pd.DataFrame(close).pct_change()).std()
RISK_FACTORS = {
'Log Market Cap' : RiskFactors.LogMarketCap,
'Downside Risk' : RiskFactors.DownsideRisk,
'Index Beta' : RiskFactors.MarketBeta,
#'Downside Beta' : RiskFactors.DownsideBeta,
'Volatility 3M' : RiskFactors.Vol3M,
}
risk_factors, t = factor_pipeline(RISK_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
risk_factors.info()
def growth_pipeline():
revenue = AnnualizedData(inputs = [Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue],
mask=UNIVERSE)
eps = AnnualizedData(inputs = [Fundamentals.diluted_eps_earnings_reports_asof_date,
Fundamentals.diluted_eps_earnings_reports],
mask=UNIVERSE)
return Pipeline({'Sales': revenue,
'EPS': eps,
'Total Assets': Fundamentals.total_assets.latest,
'Net Debt': Fundamentals.net_debt.latest},
screen=UNIVERSE)
start_timer = time()
growth_factors = run_pipeline(growth_pipeline(), start_date=START, end_date=END)
for col in growth_factors.columns:
for month in [3, 12]:
new_col = col + ' Growth {}M'.format(month)
kwargs = {new_col: growth_factors[col].pct_change(month*MONTH).groupby(level=1).rank()}
growth_factors = growth_factors.assign(**kwargs)
print('Pipeline run time {:.2f} secs'.format(time() - start_timer))
growth_factors.info()
class QualityFactors:
@staticmethod
def AssetTurnover(mask):
"""Sales divided by average of year beginning and year end assets"""
assets = AnnualAvg(inputs=[Fundamentals.total_assets],
mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return sales / assets
@staticmethod
def CurrentRatio(mask):
"""Total current assets divided by total current liabilities"""
assets = Fundamentals.current_assets.latest
liabilities = Fundamentals.current_liabilities.latest
return assets / liabilities
@staticmethod
def AssetToEquityRatio(mask):
"""Total current assets divided by common equity"""
assets = Fundamentals.current_assets.latest
equity = Fundamentals.common_stock.latest
return assets / equity
@staticmethod
def InterestCoverage(mask):
"""EBIT divided by interest expense"""
ebit = AnnualizedData(inputs = [Fundamentals.ebit_asof_date,
Fundamentals.ebit], mask=mask)
interest_expense = AnnualizedData(inputs = [Fundamentals.interest_expense_asof_date,
Fundamentals.interest_expense], mask=mask)
return ebit / interest_expense
@staticmethod
def DebtToAssetRatio(mask):
"""Total Debts divided by Total Assets"""
debt = Fundamentals.total_debt.latest
assets = Fundamentals.total_assets.latest
return debt / assets
@staticmethod
def DebtToEquityRatio(mask):
"""Total Debts divided by Common Stock Equity"""
debt = Fundamentals.total_debt.latest
equity = Fundamentals.common_stock.latest
return debt / equity
@staticmethod
def WorkingCapitalToAssets(mask):
"""Current Assets less Current liabilities (Working Capital) divided by Assets"""
working_capital = Fundamentals.working_capital.latest
assets = Fundamentals.total_assets.latest
return working_capital / assets
@staticmethod
def WorkingCapitalToSales(mask):
"""Current Assets less Current liabilities (Working Capital), divided by Sales"""
working_capital = Fundamentals.working_capital.latest
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return working_capital / sales
class MertonsDD(CustomFactor):
"""Merton's Distance to Default """
inputs = [Fundamentals.total_assets,
Fundamentals.total_liabilities,
libor.value,
USEquityPricing.close]
window_length = 252
def compute(self, today, assets, out, tot_assets, tot_liabilities, r, close):
mertons = []
for col_assets, col_liabilities, col_r, col_close in zip(tot_assets.T, tot_liabilities.T,
r.T, close.T):
vol_1y = np.nanstd(col_close)
numerator = np.log(
col_assets[-1] / col_liabilities[-1]) + ((252 * col_r[-1]) - ((vol_1y ** 2) / 2))
mertons.append(numerator / vol_1y)
out[:] = mertons
QUALITY_FACTORS = {
'AssetToEquityRatio' : QualityFactors.AssetToEquityRatio,
'AssetTurnover' : QualityFactors.AssetTurnover,
'CurrentRatio' : QualityFactors.CurrentRatio,
'DebtToAssetRatio' : QualityFactors.DebtToAssetRatio,
'DebtToEquityRatio' : QualityFactors.DebtToEquityRatio,
'InterestCoverage' : QualityFactors.InterestCoverage,
'MertonsDD' : QualityFactors.MertonsDD,
'WorkingCapitalToAssets': QualityFactors.WorkingCapitalToAssets,
'WorkingCapitalToSales' : QualityFactors.WorkingCapitalToSales,
}
quality_factors, t = factor_pipeline(QUALITY_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
quality_factors.info()
class PayoutFactors:
@staticmethod
def DividendPayoutRatio(mask):
"""Dividends Per Share divided by Earnings Per Share"""
dps = AnnualizedData(inputs = [Fundamentals.dividend_per_share_earnings_reports_asof_date,
Fundamentals.dividend_per_share_earnings_reports], mask=mask)
eps = AnnualizedData(inputs = [Fundamentals.basic_eps_earnings_reports_asof_date,
Fundamentals.basic_eps_earnings_reports], mask=mask)
return dps / eps
@staticmethod
def DividendGrowth(**kwargs):
"""Annualized percentage DPS change"""
return Fundamentals.dps_growth.latest
PAYOUT_FACTORS = {
'Dividend Payout Ratio': PayoutFactors.DividendPayoutRatio,
'Dividend Growth': PayoutFactors.DividendGrowth
}
payout_factors, t = factor_pipeline(PAYOUT_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
payout_factors.info()
class ProfitabilityFactors:
@staticmethod
def GrossProfitMargin(mask):
"""Gross Profit divided by Net Sales"""
gross_profit = AnnualizedData([Fundamentals.gross_profit_asof_date,
Fundamentals.gross_profit], mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return gross_profit / sales
@staticmethod
def NetIncomeMargin(mask):
"""Net income divided by Net Sales"""
net_income = AnnualizedData([Fundamentals.net_income_income_statement_asof_date,
Fundamentals.net_income_income_statement], mask=mask)
sales = AnnualizedData([Fundamentals.total_revenue_asof_date,
Fundamentals.total_revenue], mask=mask)
return net_income / sales
PROFITABIILTY_FACTORS = {
'Gross Profit Margin': ProfitabilityFactors.GrossProfitMargin,
'Net Income Margin': ProfitabilityFactors.NetIncomeMargin,
'Return on Equity': Fundamentals.roe.latest,
'Return on Assets': Fundamentals.roa.latest,
'Return on Invested Capital': Fundamentals.roic.latest
}
profitability_factors, t = factor_pipeline(PAYOUT_FACTORS)
print('Pipeline run time {:.2f} secs'.format(t))
payout_factors.info()
We will test predictions for various lookahead periods to identify the best holding periods that generate the best predictability, measured by the information coefficient.
More specifically, we compute returns for 1, 5, 10, 20 and 60 days using the built-in Returns function, resulting in over 25,000 observations for the universe of 100 stocks over two years (that include approximately 252 trading days each)
lookahead = [1, 5, 10, 20, 60]
returns = run_pipeline(Pipeline({'Returns{}D'.format(i): Returns(inputs=[USEquityPricing.close],
window_length=i+1, mask=UNIVERSE) for i in lookahead},
screen=UNIVERSE),
start_date=START,
end_date=END)
return_cols = ['Returns{}D'.format(i) for i in lookahead]
returns.info()
We will use over 50 features that cover a broad range of factors based on market, fundamental, and alternative data. The notebook also includes custom transformations to convert fundamental data that is typically available in quarterly reporting frequency to rolling annual totals or averages to avoid excessive season fluctuations.
Once the factors have been computed we combine them using pd.concat(), assign index names, and create a categorical variable that identifies the asset for each data point:
data = pd.concat([returns,
value_factors,
momentum_factors,
quality_factors,
payout_factors,
growth_factors,
efficiency_factors,
risk_factors], axis=1).sortlevel()
data.index.names = ['date', 'asset']
data['stock'] = data.index.get_level_values('asset').map(lambda x: x.asset_name)
data.info()
# Craete sorted dataframe of numeric_features with missing_count
missing_values0 = data.isnull().sum(axis=0).reset_index()
missing_values0.columns = ['column_name', 'missing_count']
missing_values0 = missing_values0.loc[missing_values0['missing_count']>0]
missing_values0 = missing_values0.sort_values(by='missing_count')
# Get percantage of total NaNs numeric_features
total0 = data.isnull().sum().sort_values(ascending=False)
percent0 = (data.isnull().sum()/data.isnull().count()).sort_values(ascending=False)
missing_data0 = pd.concat([total0, percent0], axis=1,join='outer', keys=['Total Missing Count', '% of Total Observations'])
missing_data0.index.name =' Numeric Feature'
missing_data0.head(len(data.columns))
ind0 = np.arange(missing_values0.shape[0])
width0 = 0.1
fig, ax = plt.subplots(figsize=(13,5))
colors0 = sns.color_palette('Set2', len(ind0))
rects0 = ax.bar(ind0, missing_values0.missing_count.values, color=colors0)
ax.set_xticks(ind0)
ax.set_xticklabels(missing_values0.column_name.values, rotation='vertical')
ax.set_ylabel("Count")
ax.set_title("Missing Observations Count")
ax.margins(0.001)
plt.show()
In a next step, we remove rows and columns that lack more than 20 percent of the observations, resulting in a loss of six percent of the observations and 5 columns:
rows_before, cols_before = data.shape
data = (data
.dropna(axis=1, thresh=int(len(data)*.8))
.dropna(thresh=int(len(data.columns) * .8)))
#data = data.fillna(data.median())
data = data.bfill().ffill()
rows_after, cols_after = data.shape
print('{:,d} rows and {:,d} columns dropped'.format(rows_before-rows_after, cols_before-cols_after))
At this point, we have 51 features and the categorical identifier of the stock:
data.sort_index(1).info()
First lets take a look at the individual distributions of all our data.
data.hist(bins=25, figsize=(22,22))
plt.show()
It is always a good idea to check the relationship between your features and target variable. Here we will look at a scatter plot of the 60 day target variable along with the p-value, r2 score and mean IC (information coefficient) for each feature.
tmp = data.drop(['Returns1D','Returns5D','Returns10D','Returns20D'], axis=1)
tmp.reset_index(level=['asset'], inplace=True, drop=True)
tmp.head()
def r2(x, y):
return pearsonr(x, y)[0] ** 2
count = 0
for i, feature in enumerate(list(tmp), 1):
count += 1
if(feature == 'Returns60D'):
print()
else:
print('{} # {}'.format(feature, count))
plt.figure(figsize=(8,5))
cm = plt.get_cmap('jet')
colors = np.linspace(0.1, 1, len(tmp))
sc = plt.scatter(tmp[feature], tmp['Returns60D'], s=25, c=colors, cmap=cm,
edgecolor='k', alpha=0.3, label='Price Data')
j = sns.regplot(tmp[feature], tmp['Returns60D'], data=tmp, scatter=False,
line_kws={'color':'k','lw':2, 'linestyle':'dashed'})
cb = plt.colorbar(sc)
cb.ax.set_yticklabels([str(p) for p in tmp[::len(tmp)//9].index],
fontdict = {'fontsize': 10,
'fontweight': 'medium'})
plt.xlabel('{}'.format(feature), size=10, labelpad=10, fontsize=10, fontweight='medium')
plt.ylabel('Returns60D', size=10, labelpad=10, fontsize=10, fontweight='medium')
plt.grid(False)
ic, pval = spearmanr(tmp[feature], tmp['Returns60D'])
R2 = r2(tmp[feature], tmp['Returns60D'])
plt.title('r2 = {}, IC = {}, P-Value = {}'.format(round(R2,4), round(ic,4), pval))
for j in range(2):
plt.tick_params(axis='x', labelsize=10)
plt.tick_params(axis='y', labelsize=10)
plt.show()
if(count == len(tmp.columns)-1):
break