import os
from collections import namedtuple
import micvbang as mvb
ReadAppendFile = namedtuple('ReadAppendFile', ['open_read', 'open_append'])
[docs]class ProgressTracker(object):
""" Track and continue progress over iterators, saving state in a file-like object.
"""
DEFAULT_F_NAME = 'progress.gz'
def __init__(self, it, get_id=None, f=None, flush_freq=0, print_skips_freq=0):
"""
Arguments:
it(iterable): Iterable used to to make the iterator to track the progress of.
get_id(function): Function mapping values generated from :param it: to unique ids of type string.
.. note::
:param get_id: must return a string value that does **not** contain newlines.
"""
self._it = it
self._get_id = get_id or (lambda x: str(x))
self._flush_freq = flush_freq
self._print_skips_freq = print_skips_freq
self._made_f = self._make_f(f)
self._progress_f = None
self.skips = 0
self._ids = set()
self._closed = False
self._f = f
def _make_f(self, f_in):
f = None
if f_in is None:
f_in = mvb.here(self.DEFAULT_F_NAME)
if self._is_file_like(f_in):
f = f_in
elif type(f_in) is ReadAppendFile:
f = f_in
elif type(f_in) is str:
_, ext = os.path.splitext(f_in)
if ext == '.gz':
f = ReadAppendFile(
open_read=lambda: mvb.open(f_in, mode='rt'),
open_append=lambda: mvb.open(f_in, mode='at')
)
else:
f = mvb.open(f_in, 'a+')
return f
def _is_file_like(self, f):
return all(getattr(f, attr, False) for attr in ['read', 'write', 'seek'])
def _print_skips(self):
if self._print_skips_freq and self.skips % self._print_skips_freq == 0:
print(" ... skipped {} ...".format(self.skips))
def _flush(self, num_iter):
if self._flush_freq and (num_iter - self.skips) % self._flush_freq == 0:
getattr(self._progress_f, 'flush', lambda: None)()
[docs] def processed(self, id):
""" Mark an id as processed.
This means that values with the given id will **not** be returned when creating
iterators using the same progress file.
"""
if id not in self._ids:
self._ids.add(id)
self._progress_f.write("{id}\n".format(id=id))
[docs] def close(self):
""" Close :class:`ProgressTracker`. This ensures that no iterators created from the
instance will progress any further.
"""
self._closed = True
def __iter__(self):
return self.iter()
[docs] def iter(self):
""" Return an iterator that iterates over the given input iterator and
automatically tracks its progress. :func:`processed` will
be called **before** each value is returned to the user.
.. note::
Potential off-by-one error here; all ids are marked as `processed`
**before** they are returned and will therefore never be returned again.
If the program crashes and the id was not in fact processed by user code,
it will go unprocessed.
"""
for id, value in self.iter_ids():
self.processed(id)
yield value
def _read_ids(self, f):
readlines = getattr(f, 'readlines', False)
if readlines:
return set(l[:-1] for l in readlines())
return set(f.read().split('\n'))
def _init_ids(self, f):
if type(f) is ReadAppendFile:
try:
with self._made_f.open_read() as f:
return self._read_ids(f)
except FileNotFoundError:
return set()
f.seek(0)
return self._read_ids(f)
[docs] def iter_ids(self):
""" Return an iterator that yields an (id, data)-tuple. In order to mark an
iteration as processed, :func:`processed` must be called with the given id.
"""
self._ids = self._init_ids(self._made_f)
progress_f = self._made_f
if type(progress_f) is ReadAppendFile:
progress_f = self._made_f.open_append()
self._progress_f = progress_f
if self._closed:
return
with self._progress_f:
for num_iter, value in enumerate(self._it):
if self._closed:
return
id = self._get_id(value)
if id in self._ids:
self.skips += 1
self._print_skips()
continue
yield id, value
self._flush(num_iter)