PK!5zeitig/__init__.py__version__ = '0.3.2' PK!׉zeitig/aggregates.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import pendulum from zeitig import events, utils local_timezone = pendulum.local_timezone() class Summary: def __init__(self): self.start = None self.end = None self.works = pendulum.duration() self.breaks = pendulum.duration() def apply_event(self, event, last_breaks=None): if isinstance(event, events.Work): if self.start is None: self.start = event.start self.end = event.end self.works += event.period # only if we have another work, we add last breaks for b in last_breaks: self.breaks += b.period last_breaks.clear() elif isinstance(event, events.Break)\ and self.start is not None: # we collect breaks after start last_breaks.append(event) @classmethod def aggregate(cls, iter_events): summary = cls() last_breaks = [] for event in iter_events: summary.apply_event(event, last_breaks) yield event yield summary class JoinedWorkDay(events.Situation): """Creates a new Situation for a whole day.""" def __init__(self, date, *, tags=None, duration=None): self.date = date super().__init__( start=date.start_of('day'), end=(date + pendulum.duration(days=1)).start_of('day'), tags=tags ) self.duration = (duration if duration is not None else pendulum.duration()) def __eq__(self, other): return ( self.start == other.start and self.end == other.end and self.duration == other.duration ) @property def unique_tags(self): seen = set() unique_tags = [] for tag in self.tags: if tag in seen: continue seen.add(tag) unique_tags.append(tag) return unique_tags def add_work(self, work_event): self.tags.extend(work_event.tags) self.notes.extend(work_event.notes) self.duration += work_event.period.as_interval() @classmethod def aggregate(cls, iter_events): actual_day = None for event in iter_events: if isinstance(event, events.Work): if not actual_day: actual_day = JoinedWorkDay(event.start) else: dt_change = DatetimeChange(actual_day, event) if dt_change.is_new_day: yield actual_day actual_day = JoinedWorkDay(event.start) actual_day.add_work(event) yield event else: # yield the last day yield actual_day def __repr__(self): return (f'<{self.__class__.__name__}' f' [{self.start}, {self.end}) {self.duration}>') class DatetimeChange: def __init__(self, last_event, event): self.last_event = last_event self.event = event def __repr__(self): return f'<{self.__class__.__name__}: {self.before} - {self.now}>' @utils.reify def before(self): return self.last_event.local_start if self.last_event else None @utils.reify def now(self): return self.event.local_start @utils.reify def is_new_day(self): if not self.before: return True last_day = self.before.start_of('day') current_day = self.now.start_of('day') return (current_day - last_day).in_days() @utils.reify def is_new_week(self): if not self.before: return True last_week = self.before.start_of('week') current_week = self.now.start_of('week') return (current_week - last_week).in_weeks() @utils.reify def is_new_month(self): if not self.before: return True # as long as deprecated start_of_month last_month = self.before.replace(day=1, hour=0, minute=0, second=0, microsecond=0) current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) return (current_month - last_month).in_months() @utils.reify def is_new_year(self): if not self.before: return True # as long as deprecated start_of_year last_year = self.before.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) current_year = self.now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) return (current_year - last_year).in_years() @utils.reify def has_changed(self): return self.is_new_day or self.is_new_week or self.is_new_month\ or self.is_new_year @classmethod def aggregate(cls, iter_events): last_event = None for event in iter_events: if isinstance(event, events.Situation): dt_change = cls(last_event, event) if dt_change.has_changed: yield dt_change last_event = event yield event class DatetimeStats: def __init__(self): self.working_days = [] self.summary = None def apply_event(self, event): if isinstance(event, DatetimeChange): if event.is_new_day and isinstance(event.event, events.Work): self.working_days.append(event.event.local_start.date()) if isinstance(event, Summary): self.summary = event @classmethod def aggregate(cls, iter_events): stats = cls() for event in iter_events: stats.apply_event(event) yield event yield stats @property def hours_per_working_day(self): return self.summary.works.total_hours() / len(self.working_days) def split_at_new_day(iter_events): """Split a situation if it overlaps a new day.""" for event in iter_events: if isinstance(event, events.Situation): yield from event.split_local_overnight() else: yield event def filter_no_breaks(iter_events): """Stop yielding `Break`s.""" for event in iter_events: if isinstance(event, events.Break): continue yield event PK!fB""zeitig/debug.pyimport sys def info(type, value, tb): if hasattr(sys, 'ps1') or not sys.stderr.isatty(): # we are in interactive mode or we don't have a tty-like # device, so we call the default hook sys.__excepthook__(type, value, tb) else: import traceback import ipdb # we are NOT in interactive mode, print the exception... print traceback.print_exception(type, value, tb) # ...then start the debugger in post-mortem mode. ipdb.post_mortem(tb) sys.excepthook = info PK!^m^)^)zeitig/events.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import datetime import re import sys import pendulum from . import utils PY_37 = sys.version_info >= (3, 7) local_timezone = pendulum.local_timezone() if PY_37: PATTERN_TYPE = re.Pattern else: PATTERN_TYPE = re._pattern_type class Interval: def __init__(self, *, start=None, end=None): self.start = start self.end = end @utils.reify def local_start(self): return self.start.in_tz(local_timezone) if self.start else None @utils.reify def local_end(self): return self.end.in_tz(local_timezone) if self.end else None @utils.reify def local_period(self): if self.local_start is not None and self.local_end is not None: local_period = self.local_end - self.local_start return local_period return None @utils.reify def period(self): if self.start is not None and self.end is not None: period = self.end - self.start return period return None @utils.reify def is_local_overnight(self): if self.local_start: # either end or local now end = self.local_end or utils.utcnow() period = end.date() - self.local_start.date() return period.total_days() > 0 # return None if no start is given return None def __repr__(self): return (f'<{self.__class__.__name__}' f' [{self.start}, {self.end}) {self.period}>') class Situation(Interval): def __init__(self, *args, tags=None, note=None, **kwargs): super().__init__(*args, **kwargs) self.tags = tags if tags is not None else [] self.notes = [note] if note is not None else [] def split_local_overnight(self): """Split the situation at local day changes.""" if self.is_local_overnight: next_start = self.local_start next_end = next_start.add(days=1).start_of('day') while next_end < self.local_end: situation = self.__class__( start=next_start.in_tz('UTC'), end=next_end.in_tz('UTC')) situation.tags = self.tags situation.notes = self.notes yield situation next_start = next_end next_end = next_start.add(days=1).start_of('day') # finish end situation = self.__class__( start=next_start.in_tz('UTC'), end=self.local_end.in_tz('UTC')) situation.tags = self.tags situation.notes = self.notes yield situation else: # do not split otherwise yield self def __repr__(self): return (f'<{self.__class__.__name__}' f' [{self.start}, {self.end}) {self.period.as_interval()}' f' - {self.tags}, {self.notes}>') class Work(Situation): pass class Break(Situation): pass class NoDefault: """Just a marker class to represent no default. This is to separate really nothing and `None`. """ class Parameter: """Define an `Event` parameter.""" def __init__(self, *, default=NoDefault, deserialize=None, serialize=None, description=None): self.__name__ = None self.default = default self.description = description self.deserialize = deserialize self.serialize = serialize def __get__(self, instance, owner): if instance is None: return self try: value = instance.__dict__[self.__name__] # we explicitelly keep original data if callable(self.deserialize): value = self.deserialize(value) return value except KeyError: if self.default is NoDefault: raise AttributeError( 'The Parameter has no default value ' f'and another value was not assigned yet: {self.__name__}' ) default = self.default()\ if callable(self.default) else self.default return default def __set__(self, instance, value): # just store the value if callable(self.serialize): value = self.serialize(value) instance.__dict__[self.__name__] = value def __set_name__(self, owner, name): self.__name__ = name class _EventMeta(type): __event_base__ = None __events__ = {} def __new__(mcs, name, bases, dct): """Create Command class. Add command_name as __module__:__name__ Collect parameters """ cls = type.__new__(mcs, name, bases, dct) if mcs.__event_base__ is None: mcs.__event_base__ = cls else: default_type = dct.get('__type__', name.lower()) mcs.__events__[default_type] = cls return cls def __init__(cls, name, bases, dct): super().__init__(name, bases, dct) cls.__params__ = params = collections.OrderedDict() if cls.__event_base__: for base in cls.__mro__: base_params = [(n, p) for (n, p) in base.__dict__.items() if isinstance(p, Parameter)] if base_params: params.update(base_params) # set parameter names in python < 3.6 if sys.version_info < (3, 6): for name, param in params.items(): param.__set_name__(cls, name) def __call__(cls, *, type=None, **params): cls = cls.__events__.get(type, cls) inst = super().__call__(type=type or cls.__type__, **params) return inst def validate_when(value): """Used to convert between pendulum and other types of datetime.""" if isinstance(value, datetime.datetime): value = pendulum.from_timestamp(value.timestamp(), tz='UTC') elif not isinstance(value, pendulum.DateTime): value = pendulum.parse(value) value = value.in_tz('UTC') return value def validate_list(value): if not isinstance(value, list): value = list(value) return value class Event(metaclass=_EventMeta): __type__ = None when = Parameter( default=utils.utcnow(), deserialize=validate_when, description='Time of the event.' ) type = Parameter( description='Some situation started and another finished before' ) tags = Parameter( default=list, serialize=validate_list, description='A list of tags for the current situation.' ) def __init__(self, **params): super().__init__() if params is not None: for name, value in params.items(): setattr(self, name, value) def __iter__(self): for name in self.__params__: try: value = getattr(self, name) yield name, value except AttributeError: pass def __getitem__(self, item): if item in self.__params__: value = getattr(self, item) return value raise IndexError(f'Item not found: {item}') def source(self): """Generate key value pairs for all params.""" for name in self.__params__: try: value = self.__dict__[name] yield name, value except KeyError: pass def __repr__(self): tags = ' '.join(f'#{tag}' for tag in self.tags) return f'[{self.__type__} @ {self.when}{" " + tags if tags else ""}]' @property def local_when(self): when = self.when.in_tz(pendulum.local_timezone()) return when class SituationEvent: note = Parameter( default=None, description='Note for the current situation.' ) def create_situation(self): """Create a situation.""" situation = self.__situation__( start=self.when, tags=self.tags, note=self.note, ) return situation def close_situation(self, situation): """Close a situation and create the next one.""" situation.end = self.when return self.create_situation() class WorkEvent(Event, SituationEvent): __type__ = 'work' __situation__ = Work class BreakEvent(Event, SituationEvent): __type__ = 'break' __situation__ = Break class ActionEvent: pass class AddEvent(Event, ActionEvent): __type__ = 'add' note = Parameter( default=None, description='Note for the current situation.' ) def apply_to_situation(self, situation): for tag in self.tags: if tag not in situation.tags: situation.tags.append(tag) try: note = self.note except AttributeError: pass else: if note is not None: situation.notes.append(self.note) def serialize_note(value): if isinstance(value, PATTERN_TYPE): value = value.pattern elif not isinstance(value, str): value = str(value) return value def deserialize_note(value): if isinstance(value, str): value = re.compile(value) return value class RemoveEvent(Event, ActionEvent): __type__ = 'remove' note = Parameter( default=None, serialize=serialize_note, deserialize=deserialize_note, description='A regex matching the notes to remove.' ) def apply_to_situation(self, situation): for tag in self.tags: if tag in situation.tags: situation.tags.remove(tag) try: re_note = self.note except AttributeError: pass else: # flush old notes if we set a new via remove left_notes = [note for note in situation.notes if not re_note.match(note)] situation.notes = left_notes PK!B_bbzeitig/reporting.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import click import colorama import crayons import jinja2 import pendulum import qtoml from zeitig import aggregates, events, sourcing, store, utils log = logging.getLogger(__name__) TEMPLATE_DEFAULTS_NAME = 'template_defaults.toml' TEMPLATE_SYNTAX_NAME = 'template_syntax.toml' TEMPLATE_PATH_NAME = 'templates' class ReportException(Exception): pass class ReportTemplateNotFound(ReportException): pass class State: def __init__(self, store): self.store = store def print(self, help): click.echo( f'Actual time: {pendulum.now().to_datetime_string()}' ) click.echo( f'\nStore used: {colorama.Style.BRIGHT}' f'{self.store.user_path}' f'{colorama.Style.RESET_ALL}') try: click.echo( f'\nActual group: {colorama.Style.BRIGHT}' f'{self.store.last_group}' f'{colorama.Style.RESET_ALL} of ' f'{", ".join(sorted(self.store.groups))}' ) sourcerer = sourcing.Sourcerer(self.store) last_event_path, last_event_stat = \ next(reversed(self.store.iter_names_created()), (None, None)) if last_event_path: last_event = sourcerer.load_event( store.EventSource(last_event_path.name)) last_situation = next( sourcerer.generate(start=last_event.when), None) if last_situation: click.echo( f'Last situation in {self.store.group_path.name}: ' f'{colorama.Style.BRIGHT}' f'{last_situation}' f'{colorama.Style.RESET_ALL} ' f'started at {colorama.Style.BRIGHT}' f'{last_situation.local_start}' f'{colorama.Style.RESET_ALL} since ' f'{last_situation.period.total_hours():.2f} hours' f'{" - " + ", ".join(last_situation.tags)}' ) except store.LastPathNotSetException: if self.store.groups: click.echo(f'{colorama.Fore.YELLOW}' 'Last group not persisted!' f'{colorama.Style.RESET_ALL}') else: click.echo( f'{colorama.Fore.RED}There is no activity recorded yet!' f'{colorama.Style.RESET_ALL}\n' ) click.echo(help) DEFAULT_JINJA_ENVS = { None: { 'trim_blocks': False, 'lstrip_blocks': True, 'keep_trailing_newline': True, 'autoescape': False, }, 'latex': { 'block_start_string': '\\BLOCK{', 'block_end_string': '}', 'variable_start_string': '\\VAR{', 'variable_end_string': '}', 'comment_start_string': '\\#{', 'comment_end_string': '}', 'line_statement_prefix': '%%', 'line_comment_prefix': '%#', 'trim_blocks': True, 'autoescape': False, } } class Templates: def __init__(self, store): self.store = store @utils.reify def user_defaults_file_path(self): defaults_file_path = \ self.store.user_path.joinpath(TEMPLATE_DEFAULTS_NAME) return defaults_file_path @utils.reify def group_defaults_file_path(self): defaults_file_path = \ self.store.group_path.joinpath(TEMPLATE_DEFAULTS_NAME) return defaults_file_path def join_template_defaults(self): defaults = {} for default_file_path in ( self.user_defaults_file_path, self.group_defaults_file_path, ): if default_file_path.is_file(): with default_file_path.open('r') as default_file: data = qtoml.load(default_file) defaults.update(data) return defaults def get_template_syntax(self, template_name): jinja_envs = DEFAULT_JINJA_ENVS.copy() templates = {} for syntax_file_path in ( self.store.user_path.joinpath(TEMPLATE_SYNTAX_NAME), self.store.group_path.joinpath(TEMPLATE_SYNTAX_NAME), ): if syntax_file_path.is_file(): with syntax_file_path.open('r') as syntax_file: syntax = qtoml.load(syntax_file) jinja_envs.update(syntax.get('jinja_env', {})) templates.update(syntax.get('templates', {})) template_syntax_name = templates.get(template_name, None) template_syntax = jinja_envs.get(template_syntax_name, None) return template_syntax def get_jinja_env(self, template_name): syntax = self.get_template_syntax(template_name=template_name) env = jinja2.Environment( loader=jinja2.ChoiceLoader([ jinja2.FileSystemLoader( str(self.store.group_path.joinpath(TEMPLATE_PATH_NAME))), jinja2.FileSystemLoader( str(self.store.user_path.joinpath(TEMPLATE_PATH_NAME))), jinja2.PackageLoader('zeitig', 'templates'), ]), **syntax ) return env def get_template(self, template_name): env = self.get_jinja_env(template_name) template = env.get_template(template_name) return template class Report(Templates): def __init__(self, store, *, start, end): super().__init__(store) self.start = start self.end = end def render(self, template_name=None): context = self.join_template_defaults() context.update({ 'py': { 'isinstance': isinstance, }, 'report': { 'start': self.start, 'end': self.end, 'group': self.store.group_path.name, 'source': sourcing.Sourcerer(self.store) .generate(start=self.start, end=self.end), }, 'events': { 'Summary': aggregates.Summary, 'DatetimeChange': aggregates.DatetimeChange, 'DatetimeStats': aggregates.DatetimeStats, 'Work': events.Work, 'Break': events.Break, 'Situation': events.Situation, 'filter_no_breaks': aggregates.filter_no_breaks, 'split_at_new_day': aggregates.split_at_new_day, 'pipeline': utils.pipeline, }, 'c': crayons, }) try: template = self.get_template(template_name) rendered = template.render(**context) except jinja2.exceptions.TemplateAssertionError as ex: log.error('%s at line %s', ex, ex.lineno) raise except jinja2.exceptions.TemplateNotFound as ex: raise ReportTemplateNotFound(*sorted(ex.__dict__.items())) return rendered def print(self, *, template_name=None): print(self.render(template_name=template_name)) PK!EkS#S#zeitig/scripts.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ events ====== z [] work [] [-t ...] [-n ] z [] break [] [-t ...] [-n ] z [] add [] [-t ...] [-n ] z [] remove [] [-t ...] [-n] reports ======= z [] report templates ========= z [] template defaults get z [] template defaults set - maintaining =========== z [] maintain undo z [] maintain list """ import logging import click import crayons import pendulum import qtoml from . import events, reporting, store, utils, sourcing log = logging.getLogger(__name__) def run(): return cli(obj=utils.adict(), auto_envvar_prefix='ZEITIG') class AliasedGroup(click.Group): def _match_commands(self, ctx, cmd_name): matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] return matches def parse_args(self, ctx, args): """Introduce an empty argument for the optional group. Thus there are always 2 arguments provided. """ if args: matches = self._match_commands(ctx, args[0]) if matches: if len(args) == 1 or not self._match_commands(ctx, args[1]): args.insert(0, '') super().parse_args(ctx, args) def get_command(self, ctx, cmd_name): """Matches substrings of commands.""" rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv matches = self._match_commands(ctx, cmd_name) if not matches: return None elif len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail('Too many matches: %s' % ', '.join(sorted(matches))) class PendulumLocal(click.ParamType): name = 'timestamp' def convert(self, value, param, ctx): try: p = pendulum.parse(value, tz=events.local_timezone) return p except: self.fail(f'`{value}` is not a valid timestamp string', param, ctx) def __repr__(self): return 'TIMESTAMP' @click.group(cls=AliasedGroup, invoke_without_command=True) @click.argument('group', required=False) @click.pass_context def cli(ctx, group): # logging.basicConfig(level=logging.DEBUG) now = utils.utcnow() ev_store = store.Store(group=group) ctx.obj.update({ 'now': now, 'store': ev_store }) if ctx.invoked_subcommand is None: state = reporting.State(ev_store) state.print(cli.get_help(ctx)) @cli.command('work') @click.option('tags', '-t', '--tag', multiple=True) @click.option('-n', '--note') @click.argument('when', required=False, type=PendulumLocal()) @click.pass_obj def cli_work(obj, tags, note, when): """Change to or start the `work` situation.""" when = (when or obj['now']).in_tz('UTC') event = events.WorkEvent(when=when) if tags: event.tags = tags if note: event.note = note obj.store.persist(event) click.echo(event) @cli.command('break') @click.option('tags', '-t', '--tag', multiple=True) @click.option('-n', '--note', default=None) @click.argument('when', required=False, type=PendulumLocal()) @click.pass_obj def cli_break(obj, tags, note, when): """Change to or start the `break` situation.""" when = (when or obj['now']).in_tz('UTC') event = events.BreakEvent(when=when) if tags: event.tags = tags if note: event.note = note obj.store.persist(event) click.echo(event) @cli.command('add') @click.option('tags', '-t', '--tag', multiple=True) @click.option('-n', '--note', default=None) @click.argument('when', required=False, type=PendulumLocal()) @click.pass_obj def cli_add(obj, tags, note, when): """Apply tags and notes.""" when = (when or obj['now']).in_tz('UTC') event = events.AddEvent(when=when) if tags: event.tags = tags if note: event.note = note obj.store.persist(event) click.echo(event) class Regex(click.ParamType): name = 'regex' def convert(self, value, param, ctx): import re try: regex = re.compile(value) return regex except re.error: self.fail(f'`{value}` is not a valid regular expression value', param, ctx) def __repr__(self): return 'REGEX' @cli.command('remove') @click.option('tags', '-t', '--tag', multiple=True, help='Remove a tag.') @click.option('-n', '--note', default=None, type=Regex(), help='Flush notes matching this regex.') @click.argument('when', required=False, type=PendulumLocal()) @click.pass_obj def cli_remove(obj, tags, note, when): """Remove tags and flush notes.""" when = (when or obj['now']).in_tz('UTC') event = events.RemoveEvent(when=when) if tags: event.tags = tags if note: event.note = note obj.store.persist(event) click.echo(event) @cli.command('report') @click.option('-s', '--start', type=PendulumLocal()) @click.option('-e', '--end', type=PendulumLocal()) @click.option('-t', '--template', default='console', help='A template to render the report.') @click.pass_obj def cli_report(obj, start, end, template): """Create a report of your events.""" end = (end or obj['now']).in_tz('UTC') report = reporting.Report(obj.store, start=start, end=end) try: report.print(template_name=template) except reporting.ReportTemplateNotFound: click.echo(click.style(f'Template not found: {template}', fg='red')) exit(1) @cli.group('template') def cli_template(): """Template commands.""" @cli_template.group('defaults') def cli_template_defaults(): """Template defaults commands.""" @cli_template_defaults.command('get') @click.pass_obj def cli_template_defaults_get(obj): """Show the actual template defaults for this user or group.""" templates = reporting.Templates(obj.store) defaults_file_path = (templates.group_defaults_file_path if obj.store.group else templates.user_defaults_file_path) if defaults_file_path.is_file(): with defaults_file_path.open('r') as f: click.echo(f.read()) @cli_template_defaults.command('set') @click.argument('defaults', type=click.File('r')) @click.pass_obj def cli_template_defaults_set(obj, defaults): """Set the template defaults for this user or group.""" data = defaults.read() try: qtoml.loads(data) except qtoml.decoder.TOMLDecodeError as ex: click.echo(crayons.red(f'{ex.__class__.__name__}: {ex.args[0]}')) exit(1) templates = reporting.Templates(obj.store) defaults_file_path = (templates.group_defaults_file_path if obj.store.group else templates.user_defaults_file_path) with defaults_file_path.open('w') as f: f.write(data) @cli_template_defaults.command('join') @click.pass_obj def cli_template_defaults_join(obj): """Show the joined actual template defaults.""" templates = reporting.Templates(obj.store) defaults = templates.join_template_defaults() click.echo(qtoml.dumps(defaults)) @cli.group('maintain') def cli_maintain(): """Maintaining commands.""" @cli_maintain.command('list') @click.pass_obj def cli_maintain_list(obj): """Show the list of events sorted by creation time.""" sourcerer = sourcing.Sourcerer(obj.store) for event_path, event_stat in obj.store.iter_names_created(): event_ctime = pendulum.from_timestamp(event_stat.st_ctime, tz='UTC') event = sourcerer.load_event(store.EventSource(event_path.name)) click.echo(f'{event_ctime} {event}') @cli_maintain.command('undo') @click.pass_obj def cli_maintain_undo(obj): """Undo the last event.""" last_event_path, last_event_stat = \ next(reversed(obj.store.iter_names_created()), (None, None)) if last_event_path: sourcerer = sourcing.Sourcerer(obj.store) event = sourcerer.load_event(store.EventSource(last_event_path.name)) click.echo(crayons.yellow(f'Undoing event: {event}')) click.echo(f'Location: {last_event_path}') click.echo() click.echo(qtoml.dumps(dict(event.source()))) if click.confirm('Are you sure?'): last_event_path.unlink() PK!]}}zeitig/sourcing.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import pendulum from . import events, utils log = logging.getLogger(__name__) SITUATION = utils.adict(WORK='work', BREAK='break') ACTION = utils.adict(ADD='add', REMOVE='remove') class Sourcerer: """Collect all intervals within a given frame. We cut events at the borders of the time frame but let lazy applied tags and notes take effect. """ events = {} def __init__(self, store): self.store = store def load_event(self, link): try: return self.events[link] except KeyError: event = self.store.load(link.name) assert event.when == link.when, 'Do not mess with the files!' self.events[link] = event return event def generate(self, *, start=None, end=None): """Generate all intervals within this time frame.""" current_situation = None for link in self.store.iter_names(): log.debug('Found event source: %s', link) if start and link.when < start: continue if end and link.when >= end: # finish situation break # find first event event = self.load_event(link) if current_situation is None: if (not start or start == link.when)\ and isinstance(event, events.SituationEvent): current_situation = event.create_situation() continue else: # assemble state of first event current_situation = self._find_situation_before(link) # trim to fit start current_situation.start = start # apply events if isinstance(event, events.SituationEvent): # close situation, yield it and create a new one new_situation = event.close_situation(current_situation) yield current_situation current_situation = new_situation else: event.apply_to_situation(current_situation) if current_situation: # quasi close last situation current_situation.end = utils.utcnow() if end: current_situation.end = end yield current_situation def _find_situation_before(self, link): """ :param link: the store link from where we start to search. """ before = None for before in link.before(): event = self.load_event(before) if isinstance(event, events.SituationEvent): situation = event.create_situation() # switch to next before before = before.next break else: # default situation is a break situation = events.Break() # apply events to situation until link while before and before is not link: event = self.load_event(before) event.apply_to_situation(situation) before = before.next return situation PK!Jzeitig/store.py# Copyright 2018 Oliver Berger # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import getpass import itertools import logging import os import pathlib import pendulum import qtoml from . import events, utils log = logging.getLogger(__name__) CONFIG_NAME = '.zeitig' SOURCE_NAME = 'source' GROUPS_NAME = 'groups' LAST_NAME = 'last' DEFAULT_CONFIG_PATHS = [pathlib.Path(p).expanduser() for p in ( '~/.local/share/zeitig', '~/.config/zeitig', )] CONFIG_STORE_ENV_NAME = 'ZEITIG_STORE' class LastPathNotSetException(Exception): pass def find_config_store(cwd=None): """Find the config store base directory.""" config_path = os.environ.get(CONFIG_STORE_ENV_NAME) config_path_override = [pathlib.Path(config_path).expanduser()]\ if config_path else [] if cwd is None: cwd = pathlib.Path.cwd() else: cwd = pathlib.Path(cwd).resolve() for config_path in itertools.chain( config_path_override, map(lambda p: p.joinpath(CONFIG_NAME), itertools.chain((cwd,), cwd.parents)), DEFAULT_CONFIG_PATHS ): try: if config_path.resolve().is_dir(): return config_path except FileNotFoundError: pass else: # create default config_path.mkdir() return config_path class Link: def __init__(self): self.previous = None self.next = None def before(self): if self.previous: yield self.previous yield from self.previous.before() def after(self): if self.next: yield self.next yield from self.next.after() def __iter__(self): return iter(self.after()) @property def head(self): """Find the last element.""" current = self while current.next: current = current.next return current @property def root(self): """Find the first element.""" current = self while current.previous: current = current.previous return current def insert(self, next): """Insert a next chain after this link.""" if self.next is not None: self.next.previous, next.next = next, self.next next.previous, self.next = self, next @classmethod def from_sequence(cls, seq): previous = None for item in seq: src = cls(item) if previous: previous.insert(src) yield src previous = src class EventSource(Link): def __init__(self, name): super().__init__() self.name = name @utils.reify def when(self): when = pendulum.parse(self.name).in_tz('UTC') return when def __repr__(self): return f'<{self.__class__.__name__} {self.when}>' class Store: """Handle persisting and loading of event sources. The default lookup precedence for the store is:: - ./.zeitig - ~/.config/zeitig - ~/.local/share/zeitig The user has to explicitelly create the local store `./.zeitig` to be used. If a local store is found in the parent directories that one is used. """ user = getpass.getuser() def __init__(self, store_path=None, group=None): self.store_path = store_path if store_path else find_config_store() self.group = group def __repr__(self): return f'{self.__class__.__name__}: {self.store_path} [{self.group}]' def iter_names_created(self): """The list of event paths sorted after creation time.""" paths = sorted(((x, x.stat()) for x in self.source_path.iterdir()), key=lambda x: (x[1].st_ctime, x[1].st_mtime, x[0].name)) return paths def iter_names(self): """Create a double linked list of all event dates.""" paths = sorted(map(lambda x: x.name, self.source_path.iterdir())) return EventSource.from_sequence(paths) @utils.reify def user_path(self): user_path = self.store_path.joinpath(self.user) if not user_path.is_dir(): user_path.mkdir(parents=True) return user_path @utils.reify def group_path(self): if not self.group and not self.last_path.is_symlink(): raise LastPathNotSetException( f'You need to link {self.last_path} to a group' ) group_path = self.user_path.joinpath(GROUPS_NAME, self.group)\ if self.group else self.last_group_path if not group_path.is_dir(): group_path.mkdir(parents=True) return group_path @utils.reify def source_path(self): source_path = self.group_path.joinpath(SOURCE_NAME) if not source_path.is_dir(): source_path.mkdir(parents=True) return source_path @utils.reify def last_path(self): last_path = self.user_path.joinpath(LAST_NAME) return last_path @utils.reify def last_group(self): try: return self.last_group_path.name except LastPathNotSetException: pass @utils.reify def last_source(self): try: last_path = self.last_path.resolve() except FileNotFoundError: pass else: if last_path.exists(): last_name = last_path.name return EventSource(last_name) @utils.reify def groups(self): group_base_path = self.user_path.joinpath(GROUPS_NAME) if not group_base_path.is_dir(): group_base_path.mkdir(parents=True) groups = [dir.name for dir in group_base_path.iterdir() if dir.is_dir()] return groups def persist(self, event): """Store the event.""" event_path = self.source_path.joinpath( str(event.when) ) source = dict(event.source()) with event_path.open('w') as event_file: qtoml.dump(source, event_file) log.info('Persisted event: %s', source) self.link_last_path() def link_last_path(self): """Point last path to the actual group path.""" if self.last_group != self.group: if self.last_path.exists(): self.last_path.unlink() self.last_path.symlink_to(self.group_path.relative_to(self.user_path)) @utils.reify def last_group_path(self): if not self.last_path.is_symlink(): raise LastPathNotSetException( f'You need to link {self.last_path} to a group') resolved_last_path = self.last_path.resolve() # this fixes old last paths, which point to an event and not to a group # since we may find the last event easily by sorting timestamps # it is mor maintable to point to the last group only if resolved_last_path.is_file(): group_path = resolved_last_path.parent.parent else: group_path = resolved_last_path return group_path def load(self, filename): event_path = self.source_path.joinpath(filename) with event_path.open('r') as event_file: event = events.Event(**qtoml.load(event_file)) return event PK!^Azeitig/templates/console{%- macro from_start() -%} {% if report.start %} from {{c.white(report.start.format("dddd D MMMM YYYY"), bold=True)}} {%- endif %} {%- endmacro -%} {%- macro until_end() -%} {% if report.end %} until {{c.white(report.end.format("dddd D MMMM YYYY"), bold=True)}} {% endif %} {%- endmacro -%} Working times for {{c.white(report.group, bold=True)}}{{from_start()}}{{until_end()}} {%- for event in events.pipeline( report.source, events.Summary.aggregate, events.filter_no_breaks, events.DatetimeChange.aggregate, events.DatetimeStats.aggregate ) -%} {%- if py.isinstance(event, events.DatetimeChange) -%} {%- if event.is_new_week -%} {{- '\nWeek: ' }}{{c.white('{}'.format(event.now.week_of_year), bold=True)}} {% endif -%} {%- endif -%} {%- if py.isinstance(event, events.Work) -%} {{- '\t'}}{{event.local_start.to_datetime_string()}} - {{event.local_end.to_time_string()}} - {{'{0:.2f}'.format(event.period.total_hours())-}} {%- if event.tags %} - {{", ".join(event.tags)}}{%- else %}{%- endif %} {% endif -%} {% if py.isinstance(event, events.Summary) -%} {{ '\nTotal hours: ' }}{{c.white('{0:.2f}'.format(event.works.total_hours()), bold=True)}} {%- endif -%} {% if py.isinstance(event, events.DatetimeStats) -%} {{ '\nTotal days: ' }}{{c.white('{0}'.format(event.working_days|count), bold=True)-}} {{ '\nHours per day: ' }}{{c.white('{0:.2f}'.format(event.hours_per_working_day), bold=True)}} {%- endif -%} {%- endfor -%} PK!'L4==zeitig/utils.pyimport functools import pendulum class reify: def __init__(self, wrapped): self.wrapped = wrapped functools.update_wrapper(self, wrapped) def __get__(self, inst, objtype=None): if inst is None: return self val = self.wrapped(inst) setattr(inst, self.wrapped.__name__, val) return val class adict(dict): def __init__(self, *args, **kwargs): self.__dict__ = self super().__init__(*args, **kwargs) def pipeline(*iterators): """Wrap all iterators around each other in the given order.""" pipeline = None for it in iterators: if pipeline is None: pipeline = it else: pipeline = it(pipeline) return pipeline def utcnow(): """Return utcnow.""" return pendulum.now(tz='UTC') PK!H1S%('zeitig-0.3.2.dist-info/entry_points.txtN+I/N.,()媲J,L׃XqqPK!]{],],zeitig-0.3.2.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!HڽTUzeitig-0.3.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H zeitig-0.3.2.dist-info/METADATAXis9޿B $$d'* 3)[mFR;1v d\>Lј*y`@So+>; .4bSFш BVB1/2J2l6\ ^i3O3te'ZlrXBI7kjmDwRa.Br1ԿbbU'!q~Gڻ`\0.O/@iB} DB(RA~R Kr[- .50yDn1H/yonhZ$HEW%IheVF\y4EdA1S}3d}.茹vJ\DfAQ%5hᇩ؅,? 2 %k)vnTu;&}svusyu~権u?I: TaޣWo=[Oi=[-[pآ]/KY&x,NL06Cp}{:@eV!!yan7CxcJ l Yȳςd:48SRBR y3OkJ<ɦUܔBޡ؄j)0OdI׋)bS2S]. R  An7V <7w7e fmosi+O3HVàpHkܐݮy:Id/ ]dSr΂h0e` 0DLhp@iǪjͅn<%mBpY= W6VcMHޞ F@f d"bb'Y5BMt3wРz#3:Y l8>E.:Y;XTӔY%IBz146@{n鳑Y] ŬC=Q7X#@eHtvW5c! A FhG7H?]ȮyUMSx{<T"j6IJm4L09NfX\緀*)-H-M>hΜ {ѵSTs_Ua2WsS\k0PK!Hmizeitig-0.3.2.dist-info/RECORDu˒H}_ Tq\R!RL$^t5nADP$F$I>ڙoNIA0dbHD_J[vCЈA{^I'~.Eaq!l1V͵¦_y&6LXWG SdF t(ޓ̧avrV fv=Xl]\ZWuI0p5y=qKS8~|ѽcpVeD,m ߣe4,N.`r )N z8ImgW}jגБ\AXYSM PR)+b>{Gk4.[:9 GJ2;֤Xp |j-4_ڏ.}|&`s[YCP G3Ш6I۠?B&_ wu) kװK,/?VEc0u\T6u;YyYWd؟qӃRğHGԵj9=]KmNDh vpCIzv_PK!5zeitig/__init__.pyPK!׉Fzeitig/aggregates.pyPK!fB""zeitig/debug.pyPK!^m^)^)Pzeitig/events.pyPK!B_bbGzeitig/reporting.pyPK!EkS#S#ofzeitig/scripts.pyPK!]}}zeitig/sourcing.pyPK!Jzeitig/store.pyPK!^Aozeitig/templates/consolePK!'L4==zeitig/utils.pyPK!H1S%('zeitig-0.3.2.dist-info/entry_points.txtPK!]{],],zeitig-0.3.2.dist-info/LICENSEPK!HڽTUzeitig-0.3.2.dist-info/WHEELPK!H zeitig-0.3.2.dist-info/METADATAPK!Hmizeitig-0.3.2.dist-info/RECORDPK