import typing
import pandas
from .helper import Helper
[docs]
class Analyzer:
'''
Provide functionality for analyzing data.
'''
[docs]
def sort_equity_index_close(
self,
csv_file: str,
excel_file: str,
) -> pandas.DataFrame:
'''
Sort the DataFrame in descending order by equity index closing values.
Parameters
----------
csv_file : str
Path to the input CSV file generated by either
:meth:`BharatFinTrack.NSETRI.download_equity_close`
or :meth:`BharatFinTrack.NSEPRI.download_equity_close`.
excel_file : str
Path to an Excel file to save the output DataFrame.
Returns
-------
DataFrame
DataFrame sorted in descending order based on the index closing value column.
'''
# Check static type of input variable origin
Helper()._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.sort_equity_index_close
),
vars_values=locals()
)
# Validate output file path
Helper()._validate_file_path(
input_file=excel_file,
input_ext='.xlsx'
)
# Read DataFrame
df = Helper()._csv_date_format(
csv_file=csv_file,
date_cols=['Base Date', 'Close Date']
)
# Sort DataFrame
df = df.sort_values(
by=['Close Value'],
ascending=[False]
).reset_index(drop=True)
df = df.drop(
columns=['ID']
)
# Write DataFrame to the Excel file
with pandas.ExcelWriter(
path=excel_file,
engine='xlsxwriter'
) as excel_writer:
df.to_excel(
excel_writer=excel_writer,
index=False
)
workbook = excel_writer.book
worksheet = excel_writer.sheets['Sheet1']
header_format = workbook.add_format(
{'bold': True, 'align': 'center'}
)
for idx, col in enumerate(df.columns):
if col == 'Index Name':
worksheet.set_column(idx, idx, 60)
else:
worksheet.set_column(idx, idx, 15)
worksheet.write(0, idx, col, header_format)
return df
[docs]
def correction_recovery_cycles(
self,
csv_file: str,
excel_file: str,
minimum_gain: int | float = 10,
mult_correction: int | float = 2.5,
mult_recovery: int | float = 10
) -> pandas.DataFrame:
'''
Identify significant turning points in the security's historical performance, focusing on consecutive
corrections and recoveries. A minimum percentage gain is required between two consecutive recoveries
to qualify as significant. The function also calculates the frequency of corrections
(declines from a peak to the next trough) and recoveries (gains from a trough to the next peak),
applying specified multipliers to filter the movements. The output offers a comprehensive overview
of the security's behavior over time.
Parameters
----------
csv_file : str
Path to the CSV file obtained from :meth:`BharatFinTrack.NSETRI.download_daily_data`.
excel_file : str
Path to an Excel file to save the output DataFrame.
minimum_gain : float, optional
The minimum percentage gain required between two consecutive tops. Default is 10.
mult_correction : float, optional
Multiplier applied to calculate the magnitude of corrections (top to bottom). Default is 2.5.
mult_recovery : float, optional
Multiplier applied to calculate the magnitude of recoveries (bottom to top). Default is 10.
Returns
-------
DataFrame
DataFrame containing the analysis results, with columns indicating identified tops,
bottoms, corrections, recoveries, and their respective multipliers.
'''
# Check static type of input variable origin
Helper()._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.correction_recovery_cycles
),
vars_values=locals()
)
# Validate output file path
Helper()._validate_file_path(
input_file=excel_file,
input_ext='.xlsx'
)
# Read DataFrame
df = Helper()._csv_date_format(
csv_file=csv_file,
date_cols=['Date']
)
# Start and end dates
start_date = df['Date'].min()
end_date = df['Date'].max()
# Monthly first date
ms_dates = pandas.date_range(
start=start_date,
end=end_date,
freq='MS'
).date
ms_dates = pandas.Series(
[start_date] + list(ms_dates) + [end_date]
).unique()
# DataFrame of monthly open and close value
month_df = pandas.DataFrame()
for idx, dates in enumerate(zip(ms_dates[:-1], ms_dates[1:])):
idx_df = df[(df['Date'] >= dates[0]) & (df['Date'] < dates[1])]
month_df.loc[idx, 'Date'] = idx_df.iloc[0, 0]
month_df.loc[idx, 'Open'] = idx_df.iloc[0, 1]
month_df.loc[idx, 'High'] = idx_df.iloc[:, 1].max()
month_df.loc[idx, 'Low'] = idx_df.iloc[:, 1].min()
month_df.loc[idx, 'Close'] = idx_df.iloc[-1, -1]
# Summary
summary = {}
summary['Start date'] = start_date
summary['End date'] = end_date
date_diff = Helper()._date_difference(
start_date=month_df['Date'].iloc[0],
end_date=end_date
)
summary['Age'] = f"{date_diff['years']}Y-{date_diff['months']}M-{date_diff['days']}D"
total_years = date_diff['years'] + (date_diff['months'] / 12) + (date_diff['days'] / 365)
summary['Open'] = month_df.iloc[0, 1]
summary['High(H)'] = month_df.iloc[:, 2].max()
summary['Close(C)'] = month_df.iloc[-1, -1]
summary['CAGR(%)'] = 100 * (pow(summary['Close(C)'] / summary['Open'], 1 / total_years) - 1)
summary['Correction(%)'] = 100 * (month_df.iloc[-1, -1] - summary['High(H)']) / summary['High(H)']
# Top value DataFrame with minimum gain
top_df = pandas.DataFrame()
i = 0
while i <= month_df.shape[0] - 1:
if i != month_df.shape[0] - 1 and month_df.iloc[i + 1, 2] > month_df.iloc[i, 2]:
pass
elif len(top_df) == 0:
top_df = pandas.concat([top_df, month_df.iloc[i, :].to_frame().T])
else:
gain = 100 * (month_df.iloc[i, 2] - top_df.iloc[-1, 2]) / top_df.iloc[-1, 2]
top_df = pandas.concat([top_df, month_df.iloc[i, :].to_frame().T]) if gain >= minimum_gain else top_df
i = i + 1
top_df = top_df.drop(columns=['Open', 'Low', 'Close']).reset_index(drop=True)
top_df = top_df.rename(
columns={
'Date': 'Month-H',
'High': 'Value-H'
}
)
top_df['Gain-H(%)'] = 100 * top_df['Value-H'].pct_change(fill_method=None)
# Bottom value DataFrame
bottom_df = pandas.DataFrame()
if len(top_df) == 1:
low_df = month_df[month_df['Low'] == month_df['Low'].min()].iloc[-1, :].to_frame().T
bottom_df = pandas.concat([bottom_df, low_df])
else:
top_dates = top_df.iloc[:, 0].tolist() + [end_date]
for td in zip(top_dates[:-1], top_dates[1:]):
td_df = month_df[(month_df.iloc[:, 0] >= td[0]) & (month_df.iloc[:, 0] < td[1])]
low_df = td_df[td_df['Low'] == td_df['Low'].min()].iloc[-1, :].to_frame().T
bottom_df = pandas.concat([bottom_df, low_df])
bottom_df = bottom_df.drop(columns=['Open', 'High', 'Close']).reset_index(drop=True)
bottom_df = bottom_df.rename(
columns={
'Date': 'Month-L',
'Low': 'Value-L'
}
)
# Combine top and bottom DataFrames
tb_df = pandas.concat([top_df, bottom_df], axis=1)
tb_df['Gain-L(%)'] = 100 * tb_df['Value-L'].pct_change(fill_method=None)
# Coutning months of correction
correction_time = [
Helper()._date_difference(
start_date=sd,
end_date=ed
) for sd, ed in zip(tb_df['Month-H'], tb_df['Month-L'])
]
correction_month = [
ct['years'] * 12 + ct['months'] for ct in correction_time
]
tb_df['Month(H-L)'] = correction_month
tb_df['H-L(%)'] = 100 * (tb_df['Value-L'] - tb_df['Value-H']) / tb_df['Value-H']
# Counting months of recovery
recovery_time = [
Helper()._date_difference(
start_date=sd,
end_date=ed
) for sd, ed in zip(tb_df['Month-L'].shift(periods=1), tb_df['Month-H'])
]
recovery_month = [
rt['years'] * 12 + rt['months'] for rt in recovery_time
]
tb_df['Month(L-H)'] = recovery_month
tb_df['L-H(%)'] = 100 * (tb_df['Value-H'] - tb_df['Value-L'].shift(periods=1)) / tb_df['Value-L'].shift(periods=1)
tb_df.iloc[0, -2] = tb_df.iloc[0, -1]
# Counting correction
count_correction = mult_correction * (tb_df['H-L(%)'] / mult_correction).apply(
lambda x: round(x) if pandas.notnull(x) else x
)
correction_df = count_correction.value_counts().to_frame().reset_index()
correction_df['Count(%)'] = 100 * correction_df['count'] / correction_df['count'].sum()
correction_df = correction_df.sort_values(
by='H-L(%)',
ascending=[True],
ignore_index=True
)
correction_df = correction_df.rename(
columns={
'H-L(%)': f'H-L({-mult_correction}%)',
'count': 'Count'
}
)
# Counting recovery
count_recovery = mult_recovery * (tb_df['L-H(%)'] / mult_recovery).apply(
lambda x: round(x) if pandas.notnull(x) else x
)
recovery_df = count_recovery.value_counts().to_frame().reset_index()
recovery_df['Count(%)'] = 100 * recovery_df['count'] / recovery_df['count'].sum()
recovery_df = recovery_df.sort_values(
by='L-H(%)',
ascending=[True],
ignore_index=True
)
recovery_df = recovery_df.rename(
columns={
'L-H(%)': f'L-H({mult_recovery}%)',
'count': 'Count'
}
)
# Summay DataFrame
summary['Recent Low(L)'] = bottom_df.iloc[-1, -1]
summary['Recovery(%)'] = round(100 * (month_df.iloc[-1, -1] - summary['Recent Low(L)']) / summary['Recent Low(L)'], 1)
summary_df = pandas.DataFrame(
data=summary.values(),
index=summary.keys()
).reset_index()
summary_df.columns = [
'Feature',
'Value'
]
# Concatenate dataframes
dfs = [
summary_df,
tb_df,
correction_df,
recovery_df
]
level_names = [
'Analysis',
'Column name'
]
output_df = pandas.concat(
objs=dfs,
axis=1,
keys=[
'Summary',
'Correction and recovery(Minimum gain={}%)'.format(minimum_gain),
'Countring correction',
'Counting recovery',
],
names=level_names
)
# Saving DataFrames
with pandas.ExcelWriter(
path=excel_file,
engine='xlsxwriter'
) as excel_writer:
output_df.to_excel(
excel_writer=excel_writer,
index_label=['Row number']
)
workbook = excel_writer.book
worksheet = excel_writer.sheets['Sheet1']
worksheet.set_column(0, 0, 15, workbook.add_format({'align': 'center'}))
for idx, col in enumerate(output_df.columns, start=1):
if col[1] == 'Feature':
worksheet.set_column(
idx, idx, 15,
workbook.add_format({'align': 'left'})
)
elif any(col[1].startswith(j) for j in ['Month', 'Count']):
worksheet.set_column(
idx, idx, 15,
workbook.add_format({'align': 'right', 'num_format': '#,##0'})
)
else:
worksheet.set_column(
idx, idx, 15,
workbook.add_format({'align': 'right', 'num_format': '#,##0.0'})
)
# Header formatting
header_format = workbook.add_format(
{
'bold': True,
'text_wrap': True,
'align': 'center',
'valign': 'top'
}
)
worksheet.write(0, 0, output_df.columns.names[0], header_format)
worksheet.write(1, 0, output_df.columns.names[1], header_format)
for i, col in enumerate(output_df.columns):
worksheet.write(0, i + 1, col[0], header_format)
worksheet.write(1, i + 1, col[1], header_format)
# Coloring the DataFrames
colors = [
'FFB6C1',
'FFD700',
'7CFC00',
'00FFFF'
]
current_col = 1
for df, color in zip(dfs, colors):
# Non-blank cell coloring
worksheet.conditional_format(
0, current_col, df.shape[0] + len(level_names), current_col + len(df.columns) - 1,
{
'type': 'no_blanks',
'format': workbook.add_format({'bg_color': color})
}
)
# Blank cell coloring
worksheet.conditional_format(
0, current_col, df.shape[0] + len(level_names), current_col + len(df.columns) - 1,
{
'type': 'blanks',
'format': workbook.add_format({'bg_color': color})
}
)
current_col += len(df.columns)
return output_df