Download Intraday Stock Data with IEX and Parquet
/Post Outline
- Why IEX?
- Why Parquet?
- System Outline
- Code
- Links
WHY IEX?
IEX is a relatively new exchange (founded in 2012). For our purposes, what makes them different from other exchanges is they provide a robust FREE API to query their stock exchange data. As a result we can leverage the pandas-datareader framework to query IEX data quite simply.
WHY PARQUET?
I don't use Hadoop, however Parquet is a great storage format within the pandas ecosystem as well. It is fast, stable, flexible, and comes with easy compression builtin. I originally learned about the format when some of my datasets were too large to fit in-memory and I started to use Dask as a drop-in replacement for Pandas. It blows away CSV's and I found it more stable and consistent than HDF5 files.
SYSTEM OUTLINE
This system will query and store ~630 ETF symbol quotes every 30 seconds during market hours. To view the project setup visit the Github Repo. First we start by outlining the system process.
- The system starts with the iex_downloader.py script. This script will;
- instantiate the logger,
- get today's market hours and date
- handle timezone conversions to confirm the script is only running during market hours
- if market hours it will query the IEX API format the data and write the data to an interim data storage location
- if not market hours no data is queried and a warning is issued.
- The second component is the iex_downloader_utils.py script. This script provides utility functions to format the response data and store it properly.
- The third component is the iex_eod_processor.py script. This script's tasks are:
- to run after the end of the market session
- read the single day's worth of intraday data collected, as a Pandas dataframe (if dataset is too big for memory can switch to Dask)
- drop any duplicates or NaN rows.
- store into a final `processed` data folder as a single compressed file containing one day's worth of compressed intraday quote data.
- delete the day's data stored `interim` folder to manage hard disk memory.
- The final component is the task scheduler. In Linux this is carried out using the `crontab` application. For Windows/Mac systems you will have to adapt the logic to your specific OS. In the `./src/data/iex_cronjob.txt` file I give a template of the tasks that need to be scheduled. These tasks are:
- Every minute, between 7am-2pm Mountain Time, Monday through Friday run the iex_downloader.py script.
- Every minute, wait 30 secs, between 7am-2pm Mountain Time, Monday through Friday run the iex_downloader.py script. Note that crontab doesn't have resolution less than a minute so we can overcome that by using a timed delay and repeating a task.
- 10 minutes after 2pm Mountain Time, Monday through Friday run the iex_eod_processor.py script.
CODE
First the iex_downloader.py script.
from pathlib import PurePath, Path
import sys
import tzlocal # pip install
## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())
from iex_downloader_utils import split_timestamp, write_to_parquet
import pandas as pd
import pandas_datareader.data as web
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
import pandas_market_calendars as mcal # pip install
import pyarrow as pa
import pyarrow.parquet as pq
import logzero
from logzero import logger
#=============================================================================
# get current timestamp
now = pd.to_datetime('today')
#=============================================================================
## setup logger
logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
#=============================================================================
# confirm market hours
local_tz = tzlocal.get_localzone() # get local timezone via tzlocal package
now_local_tz = now.tz_localize(local_tz) # localize current timestamp
nyse = mcal.get_calendar('NYSE') # get NYSE calendar
nyseToday = nyse.schedule(start_date=now.date(), end_date=now.date())
mktOpen = nyseToday.market_open.iloc[0].tz_convert(local_tz)
mktClose = nyseToday.market_close.iloc[0].tz_convert(local_tz)
if mktOpen <= now_local_tz <= mktClose: # only run during market hours
#==========================================================================
# import symbols
logger.info('importing symbols...')
symfp = Path(data_dir/'external'/'ETFList.Options.Nasdaq__M.csv')
symbols = (pd.read_csv(symfp).Symbol).tolist()
#==========================================================================
# request data
logger.info('requesting data from iex...')
data = (web.DataReader(symbols,'iex-tops')
.assign(lastSaleTime=lambda df:pd.to_datetime(df.lastSaleTime,unit='ms'))
.assign(lastUpdated=lambda df:pd.to_datetime(df.lastUpdated,unit='ms'))
.pipe(split_timestamp, timestamp=now)
.dropna())
# force float conversion for the following columns
# this is due to a problem reading in the data when schema changes
# for example when these columns are populated the data is float, when not,
# value is 0, then int64 dtypes causes schema change and read error
to_float = ['askPrice','bidPrice','lastSalePrice','marketPercent']
data.loc[:,to_float] = data.loc[:,to_float].astype(float)
if data.empty: logger.warn('data df is empty!')
#==========================================================================
# store data
logger.info('storing data to interim intraday_store')
outfp = PurePath(data_dir/'interim'/'intraday_store').as_posix()
write_to_parquet(data, outfp, logger=logger)
else:
logger.warn('system outside of market hours, no data queried')
Next the iex_downloader_utils.py script.
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
def split_timestamp(df, timestamp):
# use current timestamp
df = df.assign(queryTime=lambda df: timestamp,
year=lambda df: timestamp.year, # make year
month=lambda df: timestamp.month, # make month
day=lambda df: timestamp.day, # make day
time=lambda df: timestamp.strftime('%H:%M:%S')) # make time
return df
def write_to_parquet(df, root_path,
partition_cols=['year','month','day','time'],
logger=None):
"""
fn: wrapper for pyarrow write_to_dataset
Params
------
df : pd.DataFrame
formatted dataframe data
root_path : str, data store directory
partition_cols : list of columns (as str dtype) to partition parquet storage directory
logger : logger object
"""
if not logger: raise ValueError('must use logger object')
try:
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=root_path,
partition_cols=partition_cols)
except Exception as e:
logger.exception(e)
Next the iex_eod_processor.py script.
from pathlib import PurePath, Path
import sys
import shutil
## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format
import logzero
from logzero import logger
#=============================================================================
# get current timestamp
now = pd.to_datetime('today')
#=============================================================================
## setup logger
logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
#=============================================================================
# read intraday data into one dataframe
logger.info('reading all intraday data for today as dataframe...')
infp = PurePath(data_dir/'interim'/'intraday_store').as_posix()
try:
df = pd.read_parquet(infp).drop_duplicates().dropna()
if df.empty: logger.warn('empty dataframe for eod processing')
#==========================================================================
# store intraday data into one compressed dataframe
logger.info('storing all intraday data for today as compressed parquet file...')
outfp = PurePath(data_dir/'processed'/'intraday_store'/f'etf_intraday_data_{now.date()}.parq')
df.to_parquet(outfp, engine='fastparquet')
#==========================================================================
# delete interim store
logger.info('deleting all interim intraday data.')
rmfp = Path(data_dir/'interim'/'intraday_store'/f'year={now.year}')
shutil.rmtree(rmfp)
except Exception as e:
logger.error(f'{e}\tlikely no data today: {now.date()}')
Finally the cronjob task template.
* 7-14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log * 7-14 * * mon-fri sleep 30; /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log 10 14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_eod_processor.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log
When everything is running correctly you should see an example log file that looks like the image below.
The interim folder system will look something like the below image.
After the market closes and the eod processor script runs we can import the final dataset into a Jupyter notebook easily.
How many unique symbols?