# -*- coding: utf-8 -*- import os import calendar import datetime as dt from chinese_calendar import is_workday from tqdm import tqdm import numpy as np import pandas as pd from scipy.stats import norm import plotly import plotly.express as px import plotly.graph_objects as go # hugo-code:vars OP_C = 0.3557 # 当前期权价格 OP_S = 2.533 # 当前标的资产价格 OP_K = 2.25 # 行权价 OP_T = 16/365 # 期权剩余时间(年) OP_r = 0.0264 # 无风险利率,用shibor近似 OP_q = 0.013675 # 年化分红率 OP_sigma = 0.23427 # 标的资产波动率 OP_option_type = 'C'# 期权类型 # hugo-code:bsm def bsm_value(s,k,t,r,sigma,q,option_type): d1 = (np.log( s/k ) + ( r -q + 0.5*sigma**2 )*t ) / ( sigma*np.sqrt(t)) d2 = d1 - sigma * np.sqrt(t) if option_type == 'C': value = s*np.exp(-q*t)*norm.cdf( d1) - k*np.exp(-r*t)*norm.cdf( d2) else: value = k*np.exp(-r*t)*norm.cdf(-d2) - s*np.exp(-q*t)*norm.cdf(-d1) return value call_price = bsm_value(OP_S, OP_K, OP_T, OP_r, OP_sigma, OP_q, OP_option_type) print(f"The call option price is {call_price}") # The call option price is 0.28437779243840744 # hugo-code:iv def bsm_imp_vol(s,k,t,r,c,q,option_type): c_est = 0 # 期权价格估计值 top = 1 #波动率上限 floor = 0 #波动率下限 sigma = (floor + top)/2 #波动率初始值 count = 0 # 计数器 while abs(c - c_est) > 0.000001: c_est = bsm_value(s,k,t,r,sigma,q,option_type) # 根据价格判断波动率是被低估还是高估,并对波动率做修正 count += 1 if count > 100: # 时间价值为0的期权是算不出隐含波动率 sigma = 0 break if c - c_est > 0: # f(x)>0 floor = sigma sigma = (sigma + top)/2 else: top = sigma sigma = (sigma + floor)/2 return sigma sigma = bsm_imp_vol(OP_S, OP_K, OP_T, OP_r, OP_C, OP_q, OP_option_type) print(f"The call option imp_vol is {sigma}") # The call option imp_vol is 0.8988456726074219 # hugo-code:end_iv # 数据读取 startdate = dt.datetime(2015, 2, 9) enddate = dt.datetime(2018, 9, 21) datapath = "~/data/stock/sse_options/" df_op = pd.read_csv(os.path.join(datapath, '50ETF期权日行情_2015-2018.csv'),usecols=['date','trade_code','收盘价','成交量']) df_fd = pd.read_csv(os.path.join(datapath, '50ETF基金日行情_2013-2018.csv'),usecols=['date','close']) df_shibor = pd.read_csv(os.path.join(datapath, 'shibor_All_2015-2018.csv'),usecols=['date','1w']) # 利率 df_shibor.date = df_shibor.date.apply(lambda x: str(x)[:4]+'/'+str(x)[4:6]+'/'+str(x)[6:])# 日期格式 df_shibor.date = pd.to_datetime(df_shibor.date) df_op.date = pd.to_datetime(df_op.date) df_fd.date = pd.to_datetime(df_fd.date) df_op = df_op.sort_values('date').reset_index(drop=True) # 构建索引并合并 df = pd.merge(df_op, df_fd, on='date', how='left') df = pd.merge(df, df_shibor, on='date', how='left') df.columns = ['date','ts_code', 'c', 'vol', 's', 'r'] df['r'] = df['r'] / 100 df['k'] = [int(i[-4:]) / 1000 for i in df.ts_code] # 获取行权价格 df['call_put'] = [i[6] for i in df.ts_code] # 分离看涨、看跌 df['adjust'] = [i[11] for i in df.ts_code] df['vol'] = df['vol'].apply(lambda x: x.replace(',','')).astype('int') # 成交量含有非法字符“,”,去除后转换为整型 df = df[df['vol'] >= 20] # 数据清理 df = df[df['c'] >= 0.0001] df = df[(df.date >= startdate) & (df.date <= enddate)] # 计算到期日 & 剔除交割日期小于7天 def prompt_day(m_str): # 交割日计算 year = int('20' + m_str[:2]) month = int(m_str[2:]) ndays = calendar.monthrange(year, month)[1] wedcount = 0 for i in range(1, ndays + 1): d = dt.date(year, month, i) if d.isoweekday() == 3: # 交割日为每月第四周星期三 wedcount += 1 if wedcount >= 4 and is_workday(d): # 年假 return d raise Exception('交割日计算,异常月份 {}'.format(m_str)) df['t'] = pd.to_datetime([prompt_day(x) for x in [i[-10:-6] for i in df.ts_code]]) # date df.t = (df.t - df.date).apply(lambda x:x.days) # time delta df = df[df.t >= 7] # 剔除小于7天 df.t = df.t / 365 # hugo-code:data_clear def cons(d): # 条件约束,剔除噪点,尾盘拉砸 if d['call_put'] == 'C': price = max(d.s - d.k * np.exp(-d.r * d.t), 0) return price < d.c and d.c < d.s else: price = max(d.k * np.exp(-d.r * d.t) - d.s, 0) return price < d.c and d.c < d.k * np.exp(-d.r * d.t) df = df[df.apply(cons, axis=1)] # hugo-code:iq def iq(df): # 计算隐含分红率 q = -np.log((df.c_c + df.k * np.exp(-df.r * df.t) - df.c_p)/(df.s)) / df.t return q # hugo-code:end_iq # 看涨看跌分类 rColumns_to_c = {'ts_code':'ts_code_c','c':'c_c','call_put':'call_put_c','vol':'vol_c'} rColumns_to_p = {'ts_code':'ts_code_p','c':'c_p','call_put':'call_put_p','vol':'vol_p'} rColumns_from_c = {'c_c':'c','ts_code_c':'ts_code','call_put_c':'call_put','vol_c':'vol'} rColumns_from_p = {'c_p':'c','ts_code_p':'ts_code','call_put_p':'call_put','vol_p':'vol'} rColumns_on = ['date','k','t','r','s','adjust'] df_c = df[df['call_put'] == 'C'].rename(columns=rColumns_to_c) df_p = df[df['call_put'] == 'P'].rename(columns=rColumns_to_p) df_cp = pd.merge(df_p, df_c, how='outer',on=rColumns_on) df_cp['q'] = df_cp.apply(iq, axis = 1) # 计算隐含分红率 df_cp.fillna({'q' : 0}, inplace=True) # 对没有找到相匹配期权合约的进行空值填充 df_c = df_cp[['ts_code_c','c_c','call_put_c','vol_c', 'q'] + rColumns_on].rename(columns=rColumns_from_c) df_p = df_cp[['ts_code_p','c_p','call_put_p','vol_p', 'q'] + rColumns_on].rename(columns=rColumns_from_p) df = pd.concat([df_c, df_p]).dropna(axis=0) # 删除空值项 df = df.sort_values('date').reset_index(drop=True) tqdm.pandas(desc='calc iv') df['iv'] = df.progress_apply(lambda x: bsm_imp_vol(x.s,x.k,x.t,x.r,x.c,x.q,x.call_put), axis=1) # 重新排一下序 t_columns = ['date', 'ts_code', 'call_put','volume','t', 'k', 'c', 's', 'r', 'q','iv'] df = df.reindex(columns=t_columns) # # 保存数据到 CSV 文件 # df.to_csv('cache_data_iv.csv', index=False) # df = pd.read_csv('cache_data_iv.csv') def data_pivot(df): # 数据透视 df = df.reset_index() option_type = 'C' # 具有相同执行价格、剩余到期时间的看涨看跌期权隐含波动率相等 df = df[df['call_put']==option_type] df = df.drop(['ts_code','date','c','s','r','call_put','q'],axis=1) df['t'] = df['t'] * 365 df['t'] = df['t'].astype(int) df = df.pivot_table(index=["k"],columns=["t"],values=["iv"]) df.columns = df.columns.droplevel(0) df.index.name = None df = df.reset_index() df = df.rename(columns={'index':'k'}) df = df.round(7) return df def fitting(df): # 多项式拟合 col_list = df.columns for i in range(df.shape[1]-1): x_col = col_list[0] y_col = col_list[i+1] df1 = df.dropna(subset=[y_col]) x = df1.iloc[:,0] y = df1.iloc[:,i+1] weights = np.polyfit(x, y, 2) model = np.poly1d(weights) predict = np.poly1d(model) x_given_list = df[pd.isnull(df[y_col]) == True][x_col].tolist() for x_given in x_given_list: # 所有空值对应的k组成列表 y_predict = predict(x_given) df.loc[df[x_col]==x_given, y_col] = y_predict return df # hugo-code:smile_plot def smile_plot(df): # 波动率微笑 df = df.set_index('k') df = df.stack().reset_index() df.columns = ['k', 'days', 'iv'] fig = px.line(df, x="k", y="iv", color="days",line_shape="spline") fig.write_image("smile_plot.png") fig.write_json("smile_plot.json") # hugo-code:im_surface def im_surface(df): # 波动率曲面 df = fitting(df) # 多项式拟合,详见代码文件 df = df.set_index('k') y = np.array(df.index) x = np.array(df.columns) fig = go.Figure(data=[go.Surface(z=df.values, x=x, y=y)]) fig.update_layout(scene = dict( xaxis_title='剩余期限', yaxis_title='执行价格', zaxis_title='隐含波动率'), autosize=True, margin=dict(r=20, b=10, l=10, t=10), ) fig.write_image("im_surface.png") fig.write_json("im_surface.json") # hugo-code:end_im_surface date_info = '2018-07-23' # 选择一天 df_t = df[df.date == date_info] # 获取具体某一天的所有期权合约 df_t = df_t.reset_index(drop=True) df_t = data_pivot(df_t) # 数据透视表 df_t.to_csv('data_pivot.csv', index=False) smile_plot(df_t) im_surface(df_t)