Source code for chimcla.util_step_history_from_logfile

"""
This script is used to evaluate the logfile to create the step history images.

example call:
python step_history_from_logfile.py -l ~/mnt/XAI-DIA-gl/Carsten/logs/classifier-2023-07-10_since_2023-06-26.log

Not yet included in cli.py.

"""

import os
import re
from datetime import datetime as dtm
from sortedcontainers import SortedDict
import argparse
import glob
from typing import Tuple

import pandas as pd
from tqdm import tqdm

import cv2
import numpy as np
import matplotlib.pyplot as plt


# this is only for debugging and can be removed in production
from ipydex import activate_ips_on_exception, IPS
activate_ips_on_exception()

df_csv = pd.DataFrame()


# LOG_FILE_PATH = f"{os.environ.get('HOME')}/mnt/XAI-DIA-gl/Carsten/classifier.log"

# fname = "classifier-2023-07-10_since_2023-06-26.log"
# LOG_FILE_PATH = f"{os.environ.get('HOME')}/mnt/XAI-DIA-gl/Carsten/logs/{fname}"

[docs] def load_lines(logfile_path): with open(logfile_path) as fp: lines = fp.readlines() return lines
line_cache = {}
[docs] def get_relevant_lines(raw_lines, regex=None, return_indices=False): if not regex: # use https://pythex.org/ to test better regexes regex = "^2023-0.*$" cache_result = line_cache.get(regex) if cache_result is not None: return cache_result rec = re.compile(regex) res = [] relevant_idcs = [] # only for debugging bad = [] for i, line in enumerate(raw_lines): if rec.match(line): res.append(line) relevant_idcs.append(i) else: bad.append(line) line_cache[regex] = res if return_indices: return relevant_idcs else: return res
[docs] class TimeDeltaManager: """ This class processes and models the timing information for the conveyor belt. Its main purpose is the method `get_position_time_vector`. """ def __init__(self, relevant_lines): """ :params relevant_lines: sequence of string object containing relevant lines from log file """ # contains entries like # "2023-06-26 01:34:54,885 - XAI-Server - INFO - Takt Handler activated: Value = stop (False)" self.relevant_lines = relevant_lines tuple_list = [] for i, ts in enumerate(relevant_lines): tmp_tuple = (dtm.fromisoformat(ts.split(" - XAI")[0].replace(",", ".")), i) tuple_list.append(tmp_tuple) # create a SortedDict from that key-value-list # SortedDict always maintains strict order of keys (regardless when they are inserted) time_steps0 = SortedDict(tuple_list) self.datetime_objects = np.array(time_steps0.keys()) # store the datetime when the of the first relevant log line self.step0 = self.datetime_objects[0] # time difference in seconds w.r.t self.step0 self.time_deltas_to_step0 = np.array([q.total_seconds() for q in self.datetime_objects[:]-self.step0]) assert self.time_deltas_to_step0[0] == 0 # check consistency # store the time difference in seconds for each log line self.time_deltas = np.diff(self.time_deltas_to_step0)
[docs] def get_position_time_vector(self, end_time:str, N: int = 1400, return_abs_times=False): """ :param end_time: datetime or str like "2023-06-27 12:59:58,750" (The comma might be there for historical reasons) :param N: int; Number of steps of the conveyor belt """ if isinstance(end_time, str): end_time = dtm.fromisoformat(end_time.replace(",", ".")) # find the end_index (w.r.t. events represented by relevant lines) # -> index of first logged event which happened after end_time end_idx = np.where(end_time < self.datetime_objects)[0][0] first_idx = end_idx - N # index of the event which happened N steps before the end_index-event # if end_time is too short after the beginning of the log file, there are less # than N event logged. However, the result should always have length N. # -> we insert nan-values at the missing places if first_idx >= 0: start_idx = first_idx patch = 0 else: start_idx = 0 patch = -first_idx assert patch > 0 station_time_vector0 = self.time_deltas[start_idx:end_idx] station_time_vector = np.concatenate(([np.nan]*patch, station_time_vector0)) if return_abs_times: # +1 because index refers to deltas abs_times0 = list(self.datetime_objects[start_idx+1:end_idx+1]) abs_times_str0 = [x.isoformat() for x in abs_times0] abs_times_str = ["NaT"]*patch + abs_times_str0 return station_time_vector, abs_times_str return station_time_vector
[docs] class Container: pass
[docs] def get_img_filenames_from_dir(image_dir): assert os.path.isdir(image_dir) png_files = glob.glob(f"{image_dir}/*.png") jpg_files = glob.glob(f"{image_dir}/*.jpg") path_list = [*png_files, *jpg_files] path_list.sort() return _get_fpath_container_from_path_list(path_list)
[docs] def get_img_filenames_from_file(fpaths_file): with open(fpaths_file) as fp: txt = fp.read() path_list = txt.split("\n") return _get_fpath_container_from_path_list(path_list)
[docs] def _get_fpath_container_from_path_list(path_list) -> Container: res = Container() # extract timestamps from logfile res.time_stamps = [] res.fpaths = [] for fpath in path_list: if not fpath: continue res.fpaths.append(fpath) fname = os.path.splitext(os.path.split(fpath)[1])[0] # something like '2023-06-26_06-16-09_C50' # convert file name into iso date time format # assume filename starting like 2023- etc or some prefix like S000056_2023-... if not fname.startswith("202"): idx = fname.index("_") fname = fname[idx+1:] assert fname.startswith("202") p0, p1, _ = fname.split("_") iso_str = f"{p0} {p1.replace('-', ':')}" res.time_stamps.append((fname, iso_str)) return res
[docs] def get_img_filenames_from_logfile(all_lines): # DEBUG - Image: /home/sascha/Devel/xaidia-server/Classifier/images/2023-06-27_12-59-41_C50.png, regex_str = ".*DEBUG - Image: /home/sascha/Devel/xaidia-server/Classifier/images.*" img_lines0 = get_relevant_lines(all_lines, regex=regex_str, return_indices=False) res = Container() # extract timestamps from logfile res.time_stamps = [] res.fpaths = [] for line in img_lines0: fpath = line.split("Image:")[1].split(",")[0].strip() res.fpaths.append(fpath) fname = os.path.splitext(os.path.split(fpath)[1])[0] # something like '2023-06-26_06-16-09_C50' # convert file name into iso date time format p0, p1, _ = fname.split("_") iso_str = f"{p0} {p1.replace('-', ':')}" res.time_stamps.append((fname, iso_str)) if 0: # write all filenames to textfile dirname = "output" with open(os.path.join(dirname, "_fpaths.txt"), "w") as fp: fp.write("\n".join(res.fpaths)) return res
[docs] def plot_histogram_of_time_deltas(time_deltas): """ This function is useful for debugging. """ # make a copy of input data time_deltas10 = time_deltas*1 # collapse all values >10 to 10 (for better visualization) time_deltas10[time_deltas >= 10] = 10 plt.hist(time_deltas10, bins=[*np.arange(0, 1, .1), *np.arange(1, 9, .1), 10.1], rwidth=0.8, log=not True) plt.show()
[docs] class MainManager: def __init__(self): self.parse_args() self.load_logfile()
[docs] def parse_args(self): parser = argparse.ArgumentParser( prog='chimcla_step_history', description='evaluate the conveyor belt history of chocolate images', ) parser.add_argument('--logfile', "-l", help="the logfile to be evaluated", required=True) parser.add_argument("--image-dir", "-i", help="the image dir to be evaluated") parser.add_argument("--fpaths", "-p", help="text file containing paths") # his is intended to store the all relevant information in a Database to be easily accessible from other scripts parser.add_argument("--db-mode", "-dm", help="start in database-mode", action="store_true") # this modes iterates over directories given by PATTERN and processes csv files (ignoring entries based on CRIT_SCORE_LIMIT) parser.add_argument("--csv-mode", "-cm", help="start in csv-mode", nargs=2, metavar=("PATTERN", "CRIT_SCORE_LIMIT")) self.args = parser.parse_args()
[docs] def load_logfile(self): # get lines of log file self.all_lines = load_lines(self.args.logfile) # regex_str = r".*Value = moving \(True\).*" regex_str = r".*Value = stop \(False\).*" # get indices of relevant lines relevant_idcs0 = get_relevant_lines(self.all_lines, regex=regex_str, return_indices=True) # get the actual lines (corresponding to these indices) self.relevant_lines0 = np.array(self.all_lines)[relevant_idcs0] # create first auxiliary manager instance self.tdm0 = TimeDeltaManager(self.relevant_lines0) # here ↑ we interpret *every* 'Value = moving (True)'-line as relevant line # However, some of these lines come with unrealistic little delay after each other # the next step is to determine the limit time. i.e. the minimal time between two events which is # considered realistic. This is done by looking at the histogram: the working hypothesis thereby: # unrealistic short intervals should occur only seldom. Also, we know that the usual interval is about 3s if 0: # histogram for decision where to set the limit value # (this block normally should not be executed) plot_histogram_of_time_deltas(self.tdm0.time_deltas) exit() # by looking at the histogram this value was chosen: delta_limit = 2.6 # sort out those lines which are too short after the previous step self.relevant_lines1 = [self.relevant_lines0[0]] self.relevant_idcs1 = [relevant_idcs0[0]] # this is to save the indices of the original log file dt_saved = 0 # iterate over the time deltas and discard a log line if it comes too short after the last one # TODO: check if i should start at 1? (Because time_deltas refer to those lines starting with index 1) for i, dt in enumerate(self.tdm0.time_deltas, start=1): if dt_saved + dt >= delta_limit: self.relevant_lines1.append(self.relevant_lines0[i]) self.relevant_idcs1.append(relevant_idcs0[i]) dt_saved = 0 else: dt_saved += dt self.tdm1 = TimeDeltaManager(self.relevant_lines1) if 0: plot_histogram_of_time_deltas(self.tdm1.time_deltas) exit() assert min(self.tdm1.time_deltas) > delta_limit
[docs] def main(self): if self.args.csv_mode: # self.handle_csv_mode() self.handle_csv_mode_count_value() elif self.args.db_mode: self.create_db_with_filenames() else: self.create_position_time_images()
[docs] def handle_csv_mode(self): self.pattern: str self.pattern, self.crit_score_limit = self.args.csv_mode relevant_img_df = self._get_relevant_images() assert self.pattern.count("*") == 1 self.result_dir = self.pattern.replace("*", "_all") os.makedirs(self.result_dir, exist_ok=True) # normally for performance reasons iteration over pandas df rows is not recommended # here, simplicity matters more for img_row in tqdm(relevant_img_df.itertuples(index=False)): # self._create_combined_image(img_row) self._create_combined_image_csv(img_row) # Observe the result global df_csv # calculate sum of times for each row -> dwell time per position df_csv['dt_sum'] = df_csv.sum(axis=1, numeric_only=True) # calculate mean of times for each row -> dwell time per position df_csv['dt_mean'] = df_csv.mean(axis=1, numeric_only=True) # calculate mean of times for each row -> dwell time per position df_csv['dt_med'] = df_csv.median(axis=1, numeric_only=True) # IPS() # exit() print(df_csv) df_csv.to_csv("results.csv", sep ='\t', index=True, index_label='idx') df_csv.to_excel("results.xlsx", index_label='idx') # read csv data and plot it df_sum = pd.read_csv('results.csv', sep ='\t', index_col=0) df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time') # convert to numpy array # alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) np_sum = df_sum.dt_sum.to_numpy() plt.plot(np_sum) plt.clf() # but: plot only one times doesn't format the graphic correctly np_sum = df_sum.dt_sum.to_numpy() plt.plot(np_sum) #df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) plt.show() IPS() exit()
[docs] def handle_csv_mode_count_value(self): self.pattern: str self.pattern, self.crit_score_limit = self.args.csv_mode relevant_img_df = self._get_relevant_images() assert self.pattern.count("*") == 1 self.result_dir = self.pattern.replace("*", "_all") os.makedirs(self.result_dir, exist_ok=True) # normally for performance reasons iteration over pandas df rows is not recommended # here, simplicity matters more for img_row in tqdm(relevant_img_df.itertuples(index=False)): # self._create_combined_image(img_row) self._create_combined_image_csv(img_row) # Observe the result global df_csv #print(df_csv) df_count = df_csv.copy() # important here, before calculating additional columns # calculate sum of times for each row -> dwell time per position df_csv['dt_sum'] = df_csv.sum(axis=1, numeric_only=True) # calculate mean of times for each row -> dwell time per position df_csv['dt_mean'] = df_csv.mean(axis=1, numeric_only=True) # calculate mean of times for each row -> dwell time per position df_csv['dt_med'] = df_csv.median(axis=1, numeric_only=True) # print(df_csv) df_csv.to_csv("results.csv", sep ='\t', index=True, index_label='idx') df_csv.to_excel("results.xlsx", index_label='idx') # # read csv data and plot it # df_sum = pd.read_csv('results.csv', sep ='\t', index_col=0) # df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time') # # convert to numpy array # # alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) # np_sum = df_sum.dt_sum.to_numpy() # plt.plot(np_sum) # plt.clf() # but: plot only one times doesn't format the graphic correctly # np_sum = df_sum.dt_sum.to_numpy() # plt.plot(np_sum) # #df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) # plt.show() ######################################################################################## # count variable - increments if delay is greater than x seconds ######################################################################################## delay_threshold = 10 df_count_rows = df_count.shape[0] #len(df_count.index) df_count_cols = df_count.shape[1] print("Rows = {}, Cols = {}".format(df_count_rows, df_count_cols)) for row in range(df_count_rows): for col in range(df_count_cols): if (df_count.iloc[row,col] >= delay_threshold): print("Delay > {}s: val = {}, row = {}, col = {}".format(delay_threshold,df_count.iloc[row,col], row, col)) df_count.iloc[row,col] = 1 else: df_count.iloc[row,col] = 0 # calculate sum of critical times for each row df_count['count_sum'] = df_count.sum(axis=1, numeric_only=True) df_count.to_csv("results_count.csv", sep ='\t', index=True, index_label='idx') df_count.to_excel("results_count.xlsx", index_label='idx') # read csv data and plot it df_plot_count = pd.read_csv('results_count.csv', sep ='\t', index_col=0) df_plot_count["count_sum"].plot(kind = 'bar', y = 'count_sum') # convert to numpy array # alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) np_count = df_plot_count.count_sum.to_numpy() plt.plot(np_count) plt.clf() # but: plot only one times doesn't format the graphic correctly np_count = df_count.count_sum.to_numpy() plt.plot(np_count) #df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100)) plt.show() IPS() exit()
[docs] def _create_combined_image_csv(self, img_row): """ create csv file for further data analysis :param img_row: pandas.Series; fields: .basename, .dir, .criticality """ date_str, time_str, _ = img_row.basename.split("_") time_str = time_str.replace("-", ":") station_time_vector = self.tdm1.get_position_time_vector(f"{date_str} {time_str}") # round values station_time_vector = station_time_vector.round(2) # add current time vector data as column to csv data frame df_csv[img_row.basename] = station_time_vector.tolist()
[docs] def _create_combined_image(self, img_row): """ :param img_row: pandas.Series; fields: .basename, .dir, .criticality """ date_str, time_str, _ = img_row.basename.split("_") time_str = time_str.replace("-", ":") station_time_vector = self.tdm1.get_position_time_vector(f"{date_str} {time_str}") _, orig_img_arr = self._get_original_file(img_row) # adapt the size of the graph such that it matches the original size ratio = 2 orig_width = orig_img_arr.shape[1] # this is in inches (200dpi empirically determined) fig_width = orig_width/200 fig_height = fig_width*ratio fig = plt.figure(figsize=(fig_height, fig_width)) plt.plot(station_time_vector) ax = plt.gca() ax.set_yscale('asinh', linear_width=10, base=0) plt.ylim(-1, 1e4) plt.xticks(np.arange(0, len(station_time_vector) + 100, 100)) yticks = [0, 3, 10, 30, 100, 300, 1000, 3000] plt.yticks(yticks) ax.set_yticklabels([str(tick) for tick in yticks]) plt.grid() # plt.rcParams['figure.subplot.left'] = .4 fig_arr = self._fig_to_array(fig) plt.close() fig_arr = cv2.cvtColor(fig_arr, cv2.COLOR_RGB2BGR) assert orig_img_arr.shape[1:] == fig_arr.shape[1:] joint_array = np.concatenate((orig_img_arr, fig_arr)) fprefix=f"S{img_row.criticality}_" fname = f"{fprefix}{img_row.basename}_Cx.jpg" fpath = os.path.join(self.result_dir, fname) res = cv2.imwrite(fpath, joint_array, [cv2.IMWRITE_JPEG_QUALITY, 98])
[docs] def _get_original_file(self, img_row) -> Tuple[str, np.ndarray]: pattern = os.path.join(img_row.dir, f"*{img_row.basename}*") flist = glob.glob(pattern) assert len(flist) == 1 fpath = flist[0] img_arr = cv2.imread(fpath) return fpath, img_arr
[docs] def _fig_to_array(self, fig): # taken from https://stackoverflow.com/a/57988387 fig.tight_layout(pad=0) plt.subplots_adjust(left=.08) ax = plt.gca() # To remove the huge white borders ax.margins(0) fig.canvas.draw() image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) dim0, dim1 = fig.canvas.get_width_height()[::-1] # this is 450, 900 # check dimensions length = len(image_from_plot) factor = (length / dim0 / dim1 / 3) # on Carstens machine: length == 1215000 (factor 1) # on Saschas machine: length == 4860000 (factor 4) if factor == 1: # array dimensions are as expected image_from_plot = image_from_plot.reshape(dim0, dim1, 3) elif factor == 4: # this can happen on displays with higher resolution image_from_plot = image_from_plot.reshape(dim0*2, dim1*2, 3) # now the image is too big -> downsample it # note: cv2.resize expects the dimension in order: width, height, i.e. dim1, dim0 image_from_plot = cv2.resize(image_from_plot, (dim1, dim0), interpolation=cv2.INTER_LINEAR) assert image_from_plot.shape == (dim0, dim1, 3) else: msg = f"matplotlib generated an 1d image array with unexpected length ({factor=})" raise ValueError(msg) image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (3,)) return image_from_plot
[docs] def _get_relevant_images(self) -> pd.DataFrame: CSV_FNAME = "_criticality_list.csv" self.crit_score_limit = int(self.crit_score_limit) dirs = glob.glob(self.pattern) assert len(dirs) > 0, f"Could not find any directory for pattern {self.pattern}" res = None for dirpath in sorted(dirs): if dirpath.endswith("_all"): continue csv_fpath = os.path.join(dirpath, CSV_FNAME) if not os.path.isfile(csv_fpath): # temporarily ignore incomplete directories (they are created in parallel) continue assert os.path.isfile(csv_fpath) # all lines df = pd.read_csv(csv_fpath) df.drop(columns=["Unnamed: 0"], inplace=True) # looks like: # Unnamed: 0 basename criticality # 0 3 2023-06-26_08-50-55_C50 40268.73 # 1 82 2023-06-26_20-18-05_C50 2894.02 # 2 6 2023-06-26_08-51-39_C50 2441.10 # 3 43 2023-06-26_09-32-42_C50 1584.12 # ... # only those meeting the condition df_selected = df[df.criticality > self.crit_score_limit] df_selected.insert(0, "dir", dirpath) if res is None: res = df_selected else: res = pd.concat((res, df_selected), ignore_index=True) res.sort_values("criticality", ascending=False, inplace=True) return res
[docs] def create_db_with_filenames(self): c: Container = get_img_filenames_from_logfile(all_lines=self.all_lines) # IPS() msg = "It is not trivial to efficiently store a map from filename to position time vectors" # currently we can stick with the get_img_filenames_from_file option raise NotImplementedError(msg)
[docs] def create_position_time_images(self): # for debugging # i_test = self.tdm1.get_position_time_vector("2023-06-27 12:59:58,750") dirname = "output" os.makedirs(dirname, exist_ok=True) # now get filenames of images of interest if self.args.fpaths: c: Container = get_img_filenames_from_file(self.args.fpaths) elif self.args.image_dir: c: Container = get_img_filenames_from_dir(self.args.image_dir) else: # TODO handle # those images which are present in the logfile c: Container = get_img_filenames_from_logfile(all_lines=self.all_lines) # create final results (visualization of position time vector) for i, (basename, ts) in enumerate(c.time_stamps): # quick hack to ignore first 100 boring images if 0 and i <= 10: continue position_time_vector, abs_times_str = self.tdm1.get_position_time_vector(ts, return_abs_times=True) plt.plot(position_time_vector) plt.title(basename) img_fpath = os.path.join(dirname, f"{basename}_ptv.png") tab_fpath = os.path.join(dirname, f"{basename}_tab.csv") # np.savetxt(tab_fpath, position_time_vector, delimiter=",") df1 = pd.DataFrame({"duration": np.round(position_time_vector, 3), "timestamp": abs_times_str}) df1.to_csv(tab_fpath) plt.savefig(img_fpath) print(f"{img_fpath} written") plt.close() if 0 and i >= 3: # stop the script (useful during development) break
# this is executed by the cli script (see pyproject.toml)
[docs] def main(): mm = MainManager() mm.main()
# obsolete but does not harm if __name__ == "__main__": main()