Source code for chimcla.util_file_sorting
"""
This Module contains classes and functions for the *Lot Preparation*
step of the processing pipeline.
used by cli.py.
"""
import os
import sys
import argparse
import numpy as np
import addict
from tqdm import tqdm
import yaml
from ipydex import IPS, activate_ips_on_exception
[docs]
def diff_to_days(diff):
return float(np.round(diff.total_seconds() / (24*3600), 3))
[docs]
def fname_to_date_str(fname):
return "_".join(fname.split("_")[:2])
[docs]
class Lot(addict.Addict):
"""
A Lot-instance models a production lot (typically spanning several days)
"""
def __init__(self, start_index, part_size: int = 1000):
super().__init__(self)
self.start_index = start_index
self.end_index = None
self.first_file = None
self.last_file = None
self.pause_days = None
self.duration_days = None
self.duration_hours = None
self.number_of_images = None
self.dirname = None
self.part_size = part_size
[docs]
def set_var_values(self, dt_objs: np.ndarray, fnamelist: list[str]):
assert self.end_index is not None
duration = dt_objs[self.end_index] - dt_objs[self.start_index]
self.duration_days = diff_to_days(duration)
self.duration_hours = int(np.round(duration.total_seconds()/3600))
self.number_of_images = self.end_index - self.start_index + 1
self.first_file = fnamelist[self.start_index]
self.last_file = fnamelist[self.end_index]
date_str = fname_to_date_str(self.first_file)
if self.number_of_images < 1e4:
number_of_images_str = str(self.number_of_images)
else:
number_of_images_str = f"{(self.number_of_images/self.part_size):3.1f}k"
self.dirname = f"{date_str}__{int(self.duration_days)}d__{number_of_images_str}"
[docs]
def split_into_lots(pathlist: str, part_size: int = 1000):
"""
Distribute a big list of files (with time stamp names) into a structure of subdirectories
("lots" which contain "parts").
:param pathlist: a text file with one path per line
:param lot_size: number of files per lot
Each lot corresponds to a production cycle without major interruptions (e.g. 3 days).
"""
import datetime as dt
import numpy as np
import collections
assert part_size > 0 and isinstance(part_size, int)
activate_ips_on_exception()
basedir = os.path.split(pathlist)[0]
print(f"reading {pathlist} ...")
with open(pathlist, "r") as fp:
pathlist = fp.readlines()
def get_fname(path):
return os.path.split(path.strip())[1]
pathlist.sort(key=get_fname)
fnamelist = [get_fname(path) for path in pathlist]
cnt = collections.Counter(fnamelist)
dupes = []
for i, path in enumerate(pathlist):
if cnt[get_fname(path)] > 1:
dupes.append(path)
if dupes:
msg = (
f"Unexpectedly {len(dupes)} duplicated files have been found:\n {dupes[:10]}. "
"Please resolve that manually."
)
raise ValueError(msg)
date_strings = [fname_to_date_str(fname) for fname in fnamelist]
date_format = r"%Y-%m-%d_%H-%M-%S"
activate_ips_on_exception()
dt_objs = np.array([dt.datetime.strptime(date_str, date_format) for date_str in date_strings])
diffs = np.diff(dt_objs)
metadata = addict.Addict()
metadata.pauses = []
metadata.lots = [Lot(start_index=0, part_size=part_size)]
for i, diff in enumerate(diffs):
if diff >= dt.timedelta(days=1):
metadata.lots.append(Lot(start_index=i + 1, part_size=part_size))
metadata.lots[-1].start_index = i + 1
metadata.lots[-1].pause_days = diff_to_days(diff)
# index of the last file for this
metadata.lots[-2].end_index = i
metadata.lots[-2].set_var_values(dt_objs, fnamelist)
# note: the last lot might not yet be finished, but we assume it anyway
metadata.lots[-1].end_index = i + 1
metadata.lots[-1].set_var_values(dt_objs, fnamelist)
meta_data_fname = os.path.join(basedir, "metadata.yaml")
with open(meta_data_fname, "w") as fp:
yaml.safe_dump(metadata.to_dict(), fp)
for lot in metadata.lots:
# this is to prevent double work
# TODO: explicitly handle incomplete lots from the last run
dir_path = os.path.join(basedir, "lots", lot.dirname)
if os.path.exists(dir_path):
continue
for i in tqdm(range(lot.start_index, lot.end_index + 1)):
counter = i - lot.start_index
part_dir = f"part{(counter//part_size):03d}"
if counter % part_size == 0:
part_dir_path = os.path.join(basedir, "lots", lot.dirname, part_dir)
os.makedirs(part_dir_path, exist_ok=True)
original_path = os.path.join(basedir, pathlist[i].strip())
cmd = f"mv {original_path} {part_dir_path}"
if os.path.exists(original_path):
os.system(cmd)
print(f"processed: {dir_path}")
print("all done")