Source code for BharatFinTrack.analyzer

import typing
import pandas
from .helper import Helper


[docs] class Analyzer: ''' Provide functionality for analyzing data. '''
[docs] def sort_equity_index_close( self, csv_file: str, excel_file: str, ) -> pandas.DataFrame: ''' Sort the DataFrame in descending order by equity index closing values. Parameters ---------- csv_file : str Path to the input CSV file generated by either :meth:`BharatFinTrack.NSETRI.download_equity_close` or :meth:`BharatFinTrack.NSEPRI.download_equity_close`. excel_file : str Path to an Excel file to save the output DataFrame. Returns ------- DataFrame DataFrame sorted in descending order based on the index closing value column. ''' # Check static type of input variable origin Helper()._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.sort_equity_index_close ), vars_values=locals() ) # Validate output file path Helper()._validate_file_path( input_file=excel_file, input_ext='.xlsx' ) # Read DataFrame df = Helper()._csv_date_format( csv_file=csv_file, date_cols=['Base Date', 'Close Date'] ) # Sort DataFrame df = df.sort_values( by=['Close Value'], ascending=[False] ).reset_index(drop=True) df = df.drop( columns=['ID'] ) # Write DataFrame to the Excel file with pandas.ExcelWriter( path=excel_file, engine='xlsxwriter' ) as excel_writer: df.to_excel( excel_writer=excel_writer, index=False ) workbook = excel_writer.book worksheet = excel_writer.sheets['Sheet1'] header_format = workbook.add_format( {'bold': True, 'align': 'center'} ) for idx, col in enumerate(df.columns): if col == 'Index Name': worksheet.set_column(idx, idx, 60) else: worksheet.set_column(idx, idx, 15) worksheet.write(0, idx, col, header_format) return df
[docs] def correction_recovery_cycles( self, csv_file: str, excel_file: str, minimum_gain: int | float = 10, mult_correction: int | float = 2.5, mult_recovery: int | float = 10 ) -> pandas.DataFrame: ''' Identify significant turning points in the security's historical performance, focusing on consecutive corrections and recoveries. A minimum percentage gain is required between two consecutive recoveries to qualify as significant. The function also calculates the frequency of corrections (declines from a peak to the next trough) and recoveries (gains from a trough to the next peak), applying specified multipliers to filter the movements. The output offers a comprehensive overview of the security's behavior over time. Parameters ---------- csv_file : str Path to the CSV file obtained from :meth:`BharatFinTrack.NSETRI.download_daily_data`. excel_file : str Path to an Excel file to save the output DataFrame. minimum_gain : float, optional The minimum percentage gain required between two consecutive tops. Default is 10. mult_correction : float, optional Multiplier applied to calculate the magnitude of corrections (top to bottom). Default is 2.5. mult_recovery : float, optional Multiplier applied to calculate the magnitude of recoveries (bottom to top). Default is 10. Returns ------- DataFrame DataFrame containing the analysis results, with columns indicating identified tops, bottoms, corrections, recoveries, and their respective multipliers. ''' # Check static type of input variable origin Helper()._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.correction_recovery_cycles ), vars_values=locals() ) # Validate output file path Helper()._validate_file_path( input_file=excel_file, input_ext='.xlsx' ) # Read DataFrame df = Helper()._csv_date_format( csv_file=csv_file, date_cols=['Date'] ) # Start and end dates start_date = df['Date'].min() end_date = df['Date'].max() # Monthly first date ms_dates = pandas.date_range( start=start_date, end=end_date, freq='MS' ).date ms_dates = pandas.Series( [start_date] + list(ms_dates) + [end_date] ).unique() # DataFrame of monthly open and close value month_df = pandas.DataFrame() for idx, dates in enumerate(zip(ms_dates[:-1], ms_dates[1:])): idx_df = df[(df['Date'] >= dates[0]) & (df['Date'] < dates[1])] month_df.loc[idx, 'Date'] = idx_df.iloc[0, 0] month_df.loc[idx, 'Open'] = idx_df.iloc[0, 1] month_df.loc[idx, 'High'] = idx_df.iloc[:, 1].max() month_df.loc[idx, 'Low'] = idx_df.iloc[:, 1].min() month_df.loc[idx, 'Close'] = idx_df.iloc[-1, -1] # Summary summary = {} summary['Start date'] = start_date summary['End date'] = end_date date_diff = Helper()._date_difference( start_date=month_df['Date'].iloc[0], end_date=end_date ) summary['Age'] = f"{date_diff['years']}Y-{date_diff['months']}M-{date_diff['days']}D" total_years = date_diff['years'] + (date_diff['months'] / 12) + (date_diff['days'] / 365) summary['Open'] = month_df.iloc[0, 1] summary['High(H)'] = month_df.iloc[:, 2].max() summary['Close(C)'] = month_df.iloc[-1, -1] summary['CAGR(%)'] = 100 * (pow(summary['Close(C)'] / summary['Open'], 1 / total_years) - 1) summary['Correction(%)'] = 100 * (month_df.iloc[-1, -1] - summary['High(H)']) / summary['High(H)'] # Top value DataFrame with minimum gain top_df = pandas.DataFrame() i = 0 while i <= month_df.shape[0] - 1: if i != month_df.shape[0] - 1 and month_df.iloc[i + 1, 2] > month_df.iloc[i, 2]: pass elif len(top_df) == 0: top_df = pandas.concat([top_df, month_df.iloc[i, :].to_frame().T]) else: gain = 100 * (month_df.iloc[i, 2] - top_df.iloc[-1, 2]) / top_df.iloc[-1, 2] top_df = pandas.concat([top_df, month_df.iloc[i, :].to_frame().T]) if gain >= minimum_gain else top_df i = i + 1 top_df = top_df.drop(columns=['Open', 'Low', 'Close']).reset_index(drop=True) top_df = top_df.rename( columns={ 'Date': 'Month-H', 'High': 'Value-H' } ) top_df['Gain-H(%)'] = 100 * top_df['Value-H'].pct_change(fill_method=None) # Bottom value DataFrame bottom_df = pandas.DataFrame() if len(top_df) == 1: low_df = month_df[month_df['Low'] == month_df['Low'].min()].iloc[-1, :].to_frame().T bottom_df = pandas.concat([bottom_df, low_df]) else: top_dates = top_df.iloc[:, 0].tolist() + [end_date] for td in zip(top_dates[:-1], top_dates[1:]): td_df = month_df[(month_df.iloc[:, 0] >= td[0]) & (month_df.iloc[:, 0] < td[1])] low_df = td_df[td_df['Low'] == td_df['Low'].min()].iloc[-1, :].to_frame().T bottom_df = pandas.concat([bottom_df, low_df]) bottom_df = bottom_df.drop(columns=['Open', 'High', 'Close']).reset_index(drop=True) bottom_df = bottom_df.rename( columns={ 'Date': 'Month-L', 'Low': 'Value-L' } ) # Combine top and bottom DataFrames tb_df = pandas.concat([top_df, bottom_df], axis=1) tb_df['Gain-L(%)'] = 100 * tb_df['Value-L'].pct_change(fill_method=None) # Coutning months of correction correction_time = [ Helper()._date_difference( start_date=sd, end_date=ed ) for sd, ed in zip(tb_df['Month-H'], tb_df['Month-L']) ] correction_month = [ ct['years'] * 12 + ct['months'] for ct in correction_time ] tb_df['Month(H-L)'] = correction_month tb_df['H-L(%)'] = 100 * (tb_df['Value-L'] - tb_df['Value-H']) / tb_df['Value-H'] # Counting months of recovery recovery_time = [ Helper()._date_difference( start_date=sd, end_date=ed ) for sd, ed in zip(tb_df['Month-L'].shift(periods=1), tb_df['Month-H']) ] recovery_month = [ rt['years'] * 12 + rt['months'] for rt in recovery_time ] tb_df['Month(L-H)'] = recovery_month tb_df['L-H(%)'] = 100 * (tb_df['Value-H'] - tb_df['Value-L'].shift(periods=1)) / tb_df['Value-L'].shift(periods=1) tb_df.iloc[0, -2] = tb_df.iloc[0, -1] # Counting correction count_correction = mult_correction * (tb_df['H-L(%)'] / mult_correction).apply( lambda x: round(x) if pandas.notnull(x) else x ) correction_df = count_correction.value_counts().to_frame().reset_index() correction_df['Count(%)'] = 100 * correction_df['count'] / correction_df['count'].sum() correction_df = correction_df.sort_values( by='H-L(%)', ascending=[True], ignore_index=True ) correction_df = correction_df.rename( columns={ 'H-L(%)': f'H-L({-mult_correction}%)', 'count': 'Count' } ) # Counting recovery count_recovery = mult_recovery * (tb_df['L-H(%)'] / mult_recovery).apply( lambda x: round(x) if pandas.notnull(x) else x ) recovery_df = count_recovery.value_counts().to_frame().reset_index() recovery_df['Count(%)'] = 100 * recovery_df['count'] / recovery_df['count'].sum() recovery_df = recovery_df.sort_values( by='L-H(%)', ascending=[True], ignore_index=True ) recovery_df = recovery_df.rename( columns={ 'L-H(%)': f'L-H({mult_recovery}%)', 'count': 'Count' } ) # Summay DataFrame summary['Recent Low(L)'] = bottom_df.iloc[-1, -1] summary['Recovery(%)'] = round(100 * (month_df.iloc[-1, -1] - summary['Recent Low(L)']) / summary['Recent Low(L)'], 1) summary_df = pandas.DataFrame( data=summary.values(), index=summary.keys() ).reset_index() summary_df.columns = [ 'Feature', 'Value' ] # Concatenate dataframes dfs = [ summary_df, tb_df, correction_df, recovery_df ] level_names = [ 'Analysis', 'Column name' ] output_df = pandas.concat( objs=dfs, axis=1, keys=[ 'Summary', 'Correction and recovery(Minimum gain={}%)'.format(minimum_gain), 'Countring correction', 'Counting recovery', ], names=level_names ) # Saving DataFrames with pandas.ExcelWriter( path=excel_file, engine='xlsxwriter' ) as excel_writer: output_df.to_excel( excel_writer=excel_writer, index_label=['Row number'] ) workbook = excel_writer.book worksheet = excel_writer.sheets['Sheet1'] worksheet.set_column(0, 0, 15, workbook.add_format({'align': 'center'})) for idx, col in enumerate(output_df.columns, start=1): if col[1] == 'Feature': worksheet.set_column( idx, idx, 15, workbook.add_format({'align': 'left'}) ) elif any(col[1].startswith(j) for j in ['Month', 'Count']): worksheet.set_column( idx, idx, 15, workbook.add_format({'align': 'right', 'num_format': '#,##0'}) ) else: worksheet.set_column( idx, idx, 15, workbook.add_format({'align': 'right', 'num_format': '#,##0.0'}) ) # Header formatting header_format = workbook.add_format( { 'bold': True, 'text_wrap': True, 'align': 'center', 'valign': 'top' } ) worksheet.write(0, 0, output_df.columns.names[0], header_format) worksheet.write(1, 0, output_df.columns.names[1], header_format) for i, col in enumerate(output_df.columns): worksheet.write(0, i + 1, col[0], header_format) worksheet.write(1, i + 1, col[1], header_format) # Coloring the DataFrames colors = [ 'FFB6C1', 'FFD700', '7CFC00', '00FFFF' ] current_col = 1 for df, color in zip(dfs, colors): # Non-blank cell coloring worksheet.conditional_format( 0, current_col, df.shape[0] + len(level_names), current_col + len(df.columns) - 1, { 'type': 'no_blanks', 'format': workbook.add_format({'bg_color': color}) } ) # Blank cell coloring worksheet.conditional_format( 0, current_col, df.shape[0] + len(level_names), current_col + len(df.columns) - 1, { 'type': 'blanks', 'format': workbook.add_format({'bg_color': color}) } ) current_col += len(df.columns) return output_df
[docs] def extract_data_between_dates( self, input_csv: str, start_date: str, end_date: str, output_csv: typing.Optional[str] = None ) -> pandas.DataFrame: ''' Extract historical daily closing values from downloaded data between the given start and end dates, both inclusive. Parameters ---------- input_csv : str Path to the CSV file generated by :meth:`BharatFinTrack.NSETRI.download_daily_data`. start_date : str Start date in the format 'DD-MMM-YYYY'. end_date : str End date in the format 'DD-MMM-YYYY'. output_csv : str Path to the CSV file where the filtered data will be saved. Returns ------- DataFrame DataFrame containing the daily closing values between the specified dates. ''' # Check static type of input variable origin Helper()._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.extract_data_between_dates ), vars_values=locals() ) # Validate end date is later than start date date_s, date_e = Helper()._date_end_later_start( start_date=start_date, end_date=end_date ) # DataFrame df = Helper()._csv_date_format( csv_file=input_csv, date_cols=['Date'] ) # Extract values df = df[(df['Date'] >= date_s) & (df['Date'] <= date_e)].reset_index(drop=True) # Validate non-empty DataFrame Helper()._df_not_empty( df=df, error_str='No records found between the specified start and end dates' ) # Save the DataFrame if output_csv is not None: # Validate output file path Helper()._validate_file_path( input_file=output_csv, input_ext='.csv' ) # Write DataFrame to the CSV file Helper()._df_date_to_csv( df=df, csv_file=output_csv, date_cols=['Date'] ) return df