"""
This script is used to evaluate the logfile to create the step history images.
example call:
python step_history_from_logfile.py -l ~/mnt/XAI-DIA-gl/Carsten/logs/classifier-2023-07-10_since_2023-06-26.log
Not yet included in cli.py.
"""
import os
import re
from datetime import datetime as dtm
from sortedcontainers import SortedDict
import argparse
import glob
from typing import Tuple
import pandas as pd
from tqdm import tqdm
import cv2
import numpy as np
import matplotlib.pyplot as plt
# this is only for debugging and can be removed in production
from ipydex import activate_ips_on_exception, IPS
activate_ips_on_exception()
df_csv = pd.DataFrame()
# LOG_FILE_PATH = f"{os.environ.get('HOME')}/mnt/XAI-DIA-gl/Carsten/classifier.log"
# fname = "classifier-2023-07-10_since_2023-06-26.log"
# LOG_FILE_PATH = f"{os.environ.get('HOME')}/mnt/XAI-DIA-gl/Carsten/logs/{fname}"
[docs]
def load_lines(logfile_path):
with open(logfile_path) as fp:
lines = fp.readlines()
return lines
line_cache = {}
[docs]
def get_relevant_lines(raw_lines, regex=None, return_indices=False):
if not regex:
# use https://pythex.org/ to test better regexes
regex = "^2023-0.*$"
cache_result = line_cache.get(regex)
if cache_result is not None:
return cache_result
rec = re.compile(regex)
res = []
relevant_idcs = []
# only for debugging
bad = []
for i, line in enumerate(raw_lines):
if rec.match(line):
res.append(line)
relevant_idcs.append(i)
else:
bad.append(line)
line_cache[regex] = res
if return_indices:
return relevant_idcs
else:
return res
[docs]
class TimeDeltaManager:
"""
This class processes and models the timing information for the conveyor belt.
Its main purpose is the method `get_position_time_vector`.
"""
def __init__(self, relevant_lines):
"""
:params relevant_lines: sequence of string object containing relevant lines from log file
"""
# contains entries like
# "2023-06-26 01:34:54,885 - XAI-Server - INFO - Takt Handler activated: Value = stop (False)"
self.relevant_lines = relevant_lines
tuple_list = []
for i, ts in enumerate(relevant_lines):
tmp_tuple = (dtm.fromisoformat(ts.split(" - XAI")[0].replace(",", ".")), i)
tuple_list.append(tmp_tuple)
# create a SortedDict from that key-value-list
# SortedDict always maintains strict order of keys (regardless when they are inserted)
time_steps0 = SortedDict(tuple_list)
self.datetime_objects = np.array(time_steps0.keys())
# store the datetime when the of the first relevant log line
self.step0 = self.datetime_objects[0]
# time difference in seconds w.r.t self.step0
self.time_deltas_to_step0 = np.array([q.total_seconds() for q in self.datetime_objects[:]-self.step0])
assert self.time_deltas_to_step0[0] == 0 # check consistency
# store the time difference in seconds for each log line
self.time_deltas = np.diff(self.time_deltas_to_step0)
[docs]
def get_position_time_vector(self, end_time:str, N: int = 1400, return_abs_times=False):
"""
:param end_time: datetime or str like "2023-06-27 12:59:58,750"
(The comma might be there for historical reasons)
:param N: int; Number of steps of the conveyor belt
"""
if isinstance(end_time, str):
end_time = dtm.fromisoformat(end_time.replace(",", "."))
# find the end_index (w.r.t. events represented by relevant lines)
# -> index of first logged event which happened after end_time
end_idx = np.where(end_time < self.datetime_objects)[0][0]
first_idx = end_idx - N
# index of the event which happened N steps before the end_index-event
# if end_time is too short after the beginning of the log file, there are less
# than N event logged. However, the result should always have length N.
# -> we insert nan-values at the missing places
if first_idx >= 0:
start_idx = first_idx
patch = 0
else:
start_idx = 0
patch = -first_idx
assert patch > 0
station_time_vector0 = self.time_deltas[start_idx:end_idx]
station_time_vector = np.concatenate(([np.nan]*patch, station_time_vector0))
if return_abs_times:
# +1 because index refers to deltas
abs_times0 = list(self.datetime_objects[start_idx+1:end_idx+1])
abs_times_str0 = [x.isoformat() for x in abs_times0]
abs_times_str = ["NaT"]*patch + abs_times_str0
return station_time_vector, abs_times_str
return station_time_vector
[docs]
def get_img_filenames_from_dir(image_dir):
assert os.path.isdir(image_dir)
png_files = glob.glob(f"{image_dir}/*.png")
jpg_files = glob.glob(f"{image_dir}/*.jpg")
path_list = [*png_files, *jpg_files]
path_list.sort()
return _get_fpath_container_from_path_list(path_list)
[docs]
def get_img_filenames_from_file(fpaths_file):
with open(fpaths_file) as fp:
txt = fp.read()
path_list = txt.split("\n")
return _get_fpath_container_from_path_list(path_list)
[docs]
def _get_fpath_container_from_path_list(path_list) -> Container:
res = Container()
# extract timestamps from logfile
res.time_stamps = []
res.fpaths = []
for fpath in path_list:
if not fpath:
continue
res.fpaths.append(fpath)
fname = os.path.splitext(os.path.split(fpath)[1])[0] # something like '2023-06-26_06-16-09_C50'
# convert file name into iso date time format
# assume filename starting like 2023- etc or some prefix like S000056_2023-...
if not fname.startswith("202"):
idx = fname.index("_")
fname = fname[idx+1:]
assert fname.startswith("202")
p0, p1, _ = fname.split("_")
iso_str = f"{p0} {p1.replace('-', ':')}"
res.time_stamps.append((fname, iso_str))
return res
[docs]
def get_img_filenames_from_logfile(all_lines):
# DEBUG - Image: /home/sascha/Devel/xaidia-server/Classifier/images/2023-06-27_12-59-41_C50.png,
regex_str = ".*DEBUG - Image: /home/sascha/Devel/xaidia-server/Classifier/images.*"
img_lines0 = get_relevant_lines(all_lines, regex=regex_str, return_indices=False)
res = Container()
# extract timestamps from logfile
res.time_stamps = []
res.fpaths = []
for line in img_lines0:
fpath = line.split("Image:")[1].split(",")[0].strip()
res.fpaths.append(fpath)
fname = os.path.splitext(os.path.split(fpath)[1])[0] # something like '2023-06-26_06-16-09_C50'
# convert file name into iso date time format
p0, p1, _ = fname.split("_")
iso_str = f"{p0} {p1.replace('-', ':')}"
res.time_stamps.append((fname, iso_str))
if 0:
# write all filenames to textfile
dirname = "output"
with open(os.path.join(dirname, "_fpaths.txt"), "w") as fp:
fp.write("\n".join(res.fpaths))
return res
[docs]
def plot_histogram_of_time_deltas(time_deltas):
"""
This function is useful for debugging.
"""
# make a copy of input data
time_deltas10 = time_deltas*1
# collapse all values >10 to 10 (for better visualization)
time_deltas10[time_deltas >= 10] = 10
plt.hist(time_deltas10, bins=[*np.arange(0, 1, .1), *np.arange(1, 9, .1), 10.1], rwidth=0.8, log=not True)
plt.show()
[docs]
class MainManager:
def __init__(self):
self.parse_args()
self.load_logfile()
[docs]
def parse_args(self):
parser = argparse.ArgumentParser(
prog='chimcla_step_history',
description='evaluate the conveyor belt history of chocolate images',
)
parser.add_argument('--logfile', "-l", help="the logfile to be evaluated", required=True)
parser.add_argument("--image-dir", "-i", help="the image dir to be evaluated")
parser.add_argument("--fpaths", "-p", help="text file containing paths")
# his is intended to store the all relevant information in a Database to be easily accessible from other scripts
parser.add_argument("--db-mode", "-dm", help="start in database-mode", action="store_true")
# this modes iterates over directories given by PATTERN and processes csv files (ignoring entries based on CRIT_SCORE_LIMIT)
parser.add_argument("--csv-mode", "-cm", help="start in csv-mode", nargs=2, metavar=("PATTERN", "CRIT_SCORE_LIMIT"))
self.args = parser.parse_args()
[docs]
def load_logfile(self):
# get lines of log file
self.all_lines = load_lines(self.args.logfile)
# regex_str = r".*Value = moving \(True\).*"
regex_str = r".*Value = stop \(False\).*"
# get indices of relevant lines
relevant_idcs0 = get_relevant_lines(self.all_lines, regex=regex_str, return_indices=True)
# get the actual lines (corresponding to these indices)
self.relevant_lines0 = np.array(self.all_lines)[relevant_idcs0]
# create first auxiliary manager instance
self.tdm0 = TimeDeltaManager(self.relevant_lines0)
# here ↑ we interpret *every* 'Value = moving (True)'-line as relevant line
# However, some of these lines come with unrealistic little delay after each other
# the next step is to determine the limit time. i.e. the minimal time between two events which is
# considered realistic. This is done by looking at the histogram: the working hypothesis thereby:
# unrealistic short intervals should occur only seldom. Also, we know that the usual interval is about 3s
if 0:
# histogram for decision where to set the limit value
# (this block normally should not be executed)
plot_histogram_of_time_deltas(self.tdm0.time_deltas)
exit()
# by looking at the histogram this value was chosen:
delta_limit = 2.6
# sort out those lines which are too short after the previous step
self.relevant_lines1 = [self.relevant_lines0[0]]
self.relevant_idcs1 = [relevant_idcs0[0]] # this is to save the indices of the original log file
dt_saved = 0
# iterate over the time deltas and discard a log line if it comes too short after the last one
# TODO: check if i should start at 1? (Because time_deltas refer to those lines starting with index 1)
for i, dt in enumerate(self.tdm0.time_deltas, start=1):
if dt_saved + dt >= delta_limit:
self.relevant_lines1.append(self.relevant_lines0[i])
self.relevant_idcs1.append(relevant_idcs0[i])
dt_saved = 0
else:
dt_saved += dt
self.tdm1 = TimeDeltaManager(self.relevant_lines1)
if 0:
plot_histogram_of_time_deltas(self.tdm1.time_deltas)
exit()
assert min(self.tdm1.time_deltas) > delta_limit
[docs]
def main(self):
if self.args.csv_mode:
# self.handle_csv_mode()
self.handle_csv_mode_count_value()
elif self.args.db_mode:
self.create_db_with_filenames()
else:
self.create_position_time_images()
[docs]
def handle_csv_mode(self):
self.pattern: str
self.pattern, self.crit_score_limit = self.args.csv_mode
relevant_img_df = self._get_relevant_images()
assert self.pattern.count("*") == 1
self.result_dir = self.pattern.replace("*", "_all")
os.makedirs(self.result_dir, exist_ok=True)
# normally for performance reasons iteration over pandas df rows is not recommended
# here, simplicity matters more
for img_row in tqdm(relevant_img_df.itertuples(index=False)):
# self._create_combined_image(img_row)
self._create_combined_image_csv(img_row)
# Observe the result
global df_csv
# calculate sum of times for each row -> dwell time per position
df_csv['dt_sum'] = df_csv.sum(axis=1, numeric_only=True)
# calculate mean of times for each row -> dwell time per position
df_csv['dt_mean'] = df_csv.mean(axis=1, numeric_only=True)
# calculate mean of times for each row -> dwell time per position
df_csv['dt_med'] = df_csv.median(axis=1, numeric_only=True)
# IPS()
# exit()
print(df_csv)
df_csv.to_csv("results.csv", sep ='\t', index=True, index_label='idx')
df_csv.to_excel("results.xlsx", index_label='idx')
# read csv data and plot it
df_sum = pd.read_csv('results.csv', sep ='\t', index_col=0)
df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time')
# convert to numpy array
# alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
np_sum = df_sum.dt_sum.to_numpy()
plt.plot(np_sum)
plt.clf() # but: plot only one times doesn't format the graphic correctly
np_sum = df_sum.dt_sum.to_numpy()
plt.plot(np_sum)
#df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
plt.show()
IPS()
exit()
[docs]
def handle_csv_mode_count_value(self):
self.pattern: str
self.pattern, self.crit_score_limit = self.args.csv_mode
relevant_img_df = self._get_relevant_images()
assert self.pattern.count("*") == 1
self.result_dir = self.pattern.replace("*", "_all")
os.makedirs(self.result_dir, exist_ok=True)
# normally for performance reasons iteration over pandas df rows is not recommended
# here, simplicity matters more
for img_row in tqdm(relevant_img_df.itertuples(index=False)):
# self._create_combined_image(img_row)
self._create_combined_image_csv(img_row)
# Observe the result
global df_csv
#print(df_csv)
df_count = df_csv.copy() # important here, before calculating additional columns
# calculate sum of times for each row -> dwell time per position
df_csv['dt_sum'] = df_csv.sum(axis=1, numeric_only=True)
# calculate mean of times for each row -> dwell time per position
df_csv['dt_mean'] = df_csv.mean(axis=1, numeric_only=True)
# calculate mean of times for each row -> dwell time per position
df_csv['dt_med'] = df_csv.median(axis=1, numeric_only=True)
# print(df_csv)
df_csv.to_csv("results.csv", sep ='\t', index=True, index_label='idx')
df_csv.to_excel("results.xlsx", index_label='idx')
# # read csv data and plot it
# df_sum = pd.read_csv('results.csv', sep ='\t', index_col=0)
# df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time')
# # convert to numpy array
# # alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
# np_sum = df_sum.dt_sum.to_numpy()
# plt.plot(np_sum)
# plt.clf() # but: plot only one times doesn't format the graphic correctly
# np_sum = df_sum.dt_sum.to_numpy()
# plt.plot(np_sum)
# #df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
# plt.show()
########################################################################################
# count variable - increments if delay is greater than x seconds
########################################################################################
delay_threshold = 10
df_count_rows = df_count.shape[0] #len(df_count.index)
df_count_cols = df_count.shape[1]
print("Rows = {}, Cols = {}".format(df_count_rows, df_count_cols))
for row in range(df_count_rows):
for col in range(df_count_cols):
if (df_count.iloc[row,col] >= delay_threshold):
print("Delay > {}s: val = {}, row = {}, col = {}".format(delay_threshold,df_count.iloc[row,col], row, col))
df_count.iloc[row,col] = 1
else:
df_count.iloc[row,col] = 0
# calculate sum of critical times for each row
df_count['count_sum'] = df_count.sum(axis=1, numeric_only=True)
df_count.to_csv("results_count.csv", sep ='\t', index=True, index_label='idx')
df_count.to_excel("results_count.xlsx", index_label='idx')
# read csv data and plot it
df_plot_count = pd.read_csv('results_count.csv', sep ='\t', index_col=0)
df_plot_count["count_sum"].plot(kind = 'bar', y = 'count_sum')
# convert to numpy array
# alternative: df_sum["dt_abs"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
np_count = df_plot_count.count_sum.to_numpy()
plt.plot(np_count)
plt.clf() # but: plot only one times doesn't format the graphic correctly
np_count = df_count.count_sum.to_numpy()
plt.plot(np_count)
#df_sum["dt_sum"].plot(kind = 'bar', y = 'dwell_time', xticks = range(0,1500,100))
plt.show()
IPS()
exit()
[docs]
def _create_combined_image_csv(self, img_row):
"""
create csv file for further data analysis
:param img_row: pandas.Series; fields: .basename, .dir, .criticality
"""
date_str, time_str, _ = img_row.basename.split("_")
time_str = time_str.replace("-", ":")
station_time_vector = self.tdm1.get_position_time_vector(f"{date_str} {time_str}")
# round values
station_time_vector = station_time_vector.round(2)
# add current time vector data as column to csv data frame
df_csv[img_row.basename] = station_time_vector.tolist()
[docs]
def _create_combined_image(self, img_row):
"""
:param img_row: pandas.Series; fields: .basename, .dir, .criticality
"""
date_str, time_str, _ = img_row.basename.split("_")
time_str = time_str.replace("-", ":")
station_time_vector = self.tdm1.get_position_time_vector(f"{date_str} {time_str}")
_, orig_img_arr = self._get_original_file(img_row)
# adapt the size of the graph such that it matches the original size
ratio = 2
orig_width = orig_img_arr.shape[1]
# this is in inches (200dpi empirically determined)
fig_width = orig_width/200
fig_height = fig_width*ratio
fig = plt.figure(figsize=(fig_height, fig_width))
plt.plot(station_time_vector)
ax = plt.gca()
ax.set_yscale('asinh', linear_width=10, base=0)
plt.ylim(-1, 1e4)
plt.xticks(np.arange(0, len(station_time_vector) + 100, 100))
yticks = [0, 3, 10, 30, 100, 300, 1000, 3000]
plt.yticks(yticks)
ax.set_yticklabels([str(tick) for tick in yticks])
plt.grid()
# plt.rcParams['figure.subplot.left'] = .4
fig_arr = self._fig_to_array(fig)
plt.close()
fig_arr = cv2.cvtColor(fig_arr, cv2.COLOR_RGB2BGR)
assert orig_img_arr.shape[1:] == fig_arr.shape[1:]
joint_array = np.concatenate((orig_img_arr, fig_arr))
fprefix=f"S{img_row.criticality}_"
fname = f"{fprefix}{img_row.basename}_Cx.jpg"
fpath = os.path.join(self.result_dir, fname)
res = cv2.imwrite(fpath, joint_array, [cv2.IMWRITE_JPEG_QUALITY, 98])
[docs]
def _get_original_file(self, img_row) -> Tuple[str, np.ndarray]:
pattern = os.path.join(img_row.dir, f"*{img_row.basename}*")
flist = glob.glob(pattern)
assert len(flist) == 1
fpath = flist[0]
img_arr = cv2.imread(fpath)
return fpath, img_arr
[docs]
def _fig_to_array(self, fig):
# taken from https://stackoverflow.com/a/57988387
fig.tight_layout(pad=0)
plt.subplots_adjust(left=.08)
ax = plt.gca()
# To remove the huge white borders
ax.margins(0)
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
dim0, dim1 = fig.canvas.get_width_height()[::-1] # this is 450, 900
# check dimensions
length = len(image_from_plot)
factor = (length / dim0 / dim1 / 3)
# on Carstens machine: length == 1215000 (factor 1)
# on Saschas machine: length == 4860000 (factor 4)
if factor == 1:
# array dimensions are as expected
image_from_plot = image_from_plot.reshape(dim0, dim1, 3)
elif factor == 4:
# this can happen on displays with higher resolution
image_from_plot = image_from_plot.reshape(dim0*2, dim1*2, 3)
# now the image is too big -> downsample it
# note: cv2.resize expects the dimension in order: width, height, i.e. dim1, dim0
image_from_plot = cv2.resize(image_from_plot, (dim1, dim0), interpolation=cv2.INTER_LINEAR)
assert image_from_plot.shape == (dim0, dim1, 3)
else:
msg = f"matplotlib generated an 1d image array with unexpected length ({factor=})"
raise ValueError(msg)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (3,))
return image_from_plot
[docs]
def _get_relevant_images(self) -> pd.DataFrame:
CSV_FNAME = "_criticality_list.csv"
self.crit_score_limit = int(self.crit_score_limit)
dirs = glob.glob(self.pattern)
assert len(dirs) > 0, f"Could not find any directory for pattern {self.pattern}"
res = None
for dirpath in sorted(dirs):
if dirpath.endswith("_all"):
continue
csv_fpath = os.path.join(dirpath, CSV_FNAME)
if not os.path.isfile(csv_fpath):
# temporarily ignore incomplete directories (they are created in parallel)
continue
assert os.path.isfile(csv_fpath)
# all lines
df = pd.read_csv(csv_fpath)
df.drop(columns=["Unnamed: 0"], inplace=True)
# looks like:
# Unnamed: 0 basename criticality
# 0 3 2023-06-26_08-50-55_C50 40268.73
# 1 82 2023-06-26_20-18-05_C50 2894.02
# 2 6 2023-06-26_08-51-39_C50 2441.10
# 3 43 2023-06-26_09-32-42_C50 1584.12
# ...
# only those meeting the condition
df_selected = df[df.criticality > self.crit_score_limit]
df_selected.insert(0, "dir", dirpath)
if res is None:
res = df_selected
else:
res = pd.concat((res, df_selected), ignore_index=True)
res.sort_values("criticality", ascending=False, inplace=True)
return res
[docs]
def create_db_with_filenames(self):
c: Container = get_img_filenames_from_logfile(all_lines=self.all_lines)
# IPS()
msg = "It is not trivial to efficiently store a map from filename to position time vectors"
# currently we can stick with the get_img_filenames_from_file option
raise NotImplementedError(msg)
[docs]
def create_position_time_images(self):
# for debugging
# i_test = self.tdm1.get_position_time_vector("2023-06-27 12:59:58,750")
dirname = "output"
os.makedirs(dirname, exist_ok=True)
# now get filenames of images of interest
if self.args.fpaths:
c: Container = get_img_filenames_from_file(self.args.fpaths)
elif self.args.image_dir:
c: Container = get_img_filenames_from_dir(self.args.image_dir)
else:
# TODO handle
# those images which are present in the logfile
c: Container = get_img_filenames_from_logfile(all_lines=self.all_lines)
# create final results (visualization of position time vector)
for i, (basename, ts) in enumerate(c.time_stamps):
# quick hack to ignore first 100 boring images
if 0 and i <= 10:
continue
position_time_vector, abs_times_str = self.tdm1.get_position_time_vector(ts, return_abs_times=True)
plt.plot(position_time_vector)
plt.title(basename)
img_fpath = os.path.join(dirname, f"{basename}_ptv.png")
tab_fpath = os.path.join(dirname, f"{basename}_tab.csv")
# np.savetxt(tab_fpath, position_time_vector, delimiter=",")
df1 = pd.DataFrame({"duration": np.round(position_time_vector, 3), "timestamp": abs_times_str})
df1.to_csv(tab_fpath)
plt.savefig(img_fpath)
print(f"{img_fpath} written")
plt.close()
if 0 and i >= 3:
# stop the script (useful during development)
break
# this is executed by the cli script (see pyproject.toml)
[docs]
def main():
mm = MainManager()
mm.main()
# obsolete but does not harm
if __name__ == "__main__":
main()