PK zeHI" I" photofs/__init__.py#!/usr/bin/env python
# coding: utf-8
# photofs
# Copyright (C) 2012-2016 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
import os
import stat
import threading
import time
# For FUSE
import errno
import fuse
from ._image import Image, FileBasedImage
from ._source import ImageSource
from ._tag import Tag
# Import the actual image sources
from .sources import *
class PhotoFS(fuse.LoggingMixIn, fuse.Operations):
"""An implementation of a *FUSE* file system.
It presents the tagged image libraries from image sources as a tag tree in
the file system.
:param ImageSource source: The image source.
:param database: An override for the default database file for the
selected image source.
:type database: str or None
:param bool use_links: Whether to report file based images as links.
:param dict filters: A mapping from top level directory to filtering
function. If this is falsy, no filters are used, and root tags are used
to populate the root directory.
:param str video_path: The directory in the mounted root to contain videos.
:param str date_format: The date format string used to construct file names
from time stamps.
:raises RuntimeError: if an error occurs
"""
def __init__(
self,
mountpoint,
source=list(ImageSource.SOURCES.keys())[0],
use_links=False,
filters={},
date_format='%Y-%m-%d, %H.%M',
**kwargs):
super(PhotoFS, self).__init__()
self.source = source
self.use_links = use_links
self.filters = filters
Image.DATE_FORMAT = date_format
self.creation = None
self.dirstat = None
self.image_source = None
self.handles = {}
# Create the image source
self.image_source = ImageSource.get(self.source)(**kwargs)
try:
# Store the current time as timestamp for directories
self.creation = int(time.time())
# Use the lstat result of the mount point for all directories
self.dirstat = os.lstat(mountpoint)
except Exception as e:
try:
raise RuntimeError(
'Failed to initialise file system: %s',
e.args[0] % e.args[1:])
except:
raise RuntimeError(
'Failed to initialise file system: %s',
str(e))
def destroy(self, path):
pass
def recursive_filter(self, item, include):
"""The recursive filter used to actually filter the image source.
This function will simply call include_filter in the outer scope if
item is an instance of :class:`Image`, otherwise it will recursively
call itself on all items in the tag, and return whether the filtered
tag contains any subitems.
:param item: The item to filter.
:type item: Image or Tag
:return: ``True`` if the item should be kept and ``False``
otherwise
"""
if isinstance(item, Image):
return include(item)
elif isinstance(item, Tag):
return any(
self.recursive_filter(item, include)
for item in item.values())
else:
return False
def locate(self, path):
"""Locates a filter function and an image or tag resource.
If the path denotes the root, :attr:`self.image_source` is returned
:param str path: The absolute path of the resource. This must begin
with :attr:`os.path.sep`.
:return: the tuple ``(include, resource)``, where ``include`` is
``None`` if no filters are registered
:raises KeyError: if the resource does not exist using the filter
"""
# The root path corresponds to the filters, if any registered, or the
# image source root tags
if path == os.path.sep:
return (None, self.filters or self.image_source)
# If any filters are registered, the first part of the path is the
# filter name; the filter must allow the item
if self.filters:
root, rest = self.split_path(path)
include = self.filters[root]
if rest:
item = self.image_source.locate(os.path.sep + rest)
if not self.recursive_filter(item, include):
raise KeyError(path)
path = os.path.sep + rest
else:
include = None
return (
include,
self.image_source.locate(path) if path else self.image_source)
def split_path(self, path):
"""Returns the tuple ``(root, rest)`` for a path, where ``root`` is the
directory immediately beneath the root and ``rest`` is anything after
that.
:param str path: The path to split. This must begin with
:attr:`os.path.sep`.
:return: a tuple containing the split path, which may be empty strings
:raises ValueError: if ``path`` does not begin with :attr:`os.path.sep`
"""
if path[0] != os.path.sep:
raise ValueError(
'%s is not a valid path',
path)
path = path[len(os.path.sep):]
if os.path.sep in path:
return path.split(os.path.sep, 1)
else:
return (path, '')
def getattr(self, path, fh=None):
try:
include, item = self.locate(path)
except KeyError:
raise fuse.FuseOSError(errno.ENOENT)
if self.use_links and isinstance(item, FileBasedImage):
# This is a link
st = os.stat_result((item.stat[0] | stat.S_IFLNK,) + item.stat[1:])
elif isinstance(item, Image):
# This is a file
st = item.stat
elif isinstance(item, dict):
# This is a directory; this matches both Tag and ImageSource
st = self.dirstat
else:
raise RuntimeError(
'Unknown object: %s',
path)
return dict(
# Remove write permission bits
st_mode=st.st_mode & ~(
stat.S_IWGRP | stat.S_IWUSR | stat.S_IWOTH),
st_gid=st.st_gid,
st_uid=st.st_uid,
st_nlink=st.st_nlink,
st_atime=st.st_atime,
st_ctime=st.st_ctime,
st_mtime=st.st_mtime,
st_size=st.st_size)
def readdir(self, path, offset):
if path == os.path.sep:
return [
k
for k in (self.filters or self.image_source)]
try:
include, item = self.locate(path)
except KeyError:
raise fuse.FuseOSError(errno.ENOENT)
if isinstance(item, dict):
# This is a directory; this matches both Tag and
# ImageSource
return [
k
for k, v in item.items()
if self.recursive_filter(v, include)]
else:
raise RuntimeError(
'Unknown object: %s',
os.path.join(root, path))
def readlink(self, path):
include, item = self.locate(path)
try:
return item.location
except:
raise fuse.FuseOSError(errno.EINVAL)
def open(self, path, flags):
include, item = self.locate(path)
if isinstance(item, Image):
handle = item.open(flags)
self.handles[id(handle)] = (handle, threading.Lock())
return id(handle)
else:
raise fuse.FuseOSError(errno.EINVAL)
def release(self, path, fh):
try:
handle, lock = self.handles[fh]
with lock:
handle.close()
del self.handles[fh]
except:
raise fuse.FuseOSError(errno.EINVAL)
def read(self, path, size, offset, fh):
handle, lock = self.handles[fh]
with lock:
if handle.tell() != offset:
handle.seek(offset)
return handle.read(size)
PK \eH]! ]! photofs/_source.py#!/usr/bin/env python
# coding: utf-8
# photofs
# Copyright (C) 2012-2016 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
import os
from ._util import make_unique
from ._tag import Tag
class ImageSource(dict):
"""A source of images and tags.
This is an abstract class.
"""
#: A mapping of all registered sources by name to implementing classes
SOURCES = {}
@classmethod
def add_arguments(self, argparser):
"""Adds all command line arguments for this image source to an argument
parser.
:param argparse.ArgumentParser argparser: The argument parser to which
to add arguments.
"""
pass
@classmethod
def register(self, name):
"""A decorator that registers an :class:`ImageSource` subclass as an
image source.
:param str name: The name of the image source.
"""
def inner(cls):
self.SOURCES[name] = cls
return cls
return inner
@classmethod
def get(self, name):
"""Returns the class responsible for a named image source.
:param str name: The name of the source.
:return: the corresponding class
:raises ValueError: if ``name`` is not a known image source
"""
try:
return self.SOURCES[name]
except KeyError:
raise ValueError('%s is not a valid image source', name)
def _break_path(self, path):
"""Breaks an absolute path into its segments.
:param str path: The absolute path to break, for example
``'/Tag/Other/Third'``, which will yield
``['Tag', 'Other', 'Third']``. This string must begin with
:attr:`os.path.sep`.
:raises ValueError: if path does not begin with os.path.sep
:return: the path elements
:rtype: [str]
"""
# Make sure the path begins with a path separator
if path[0] != os.path.sep:
raise ValueError(
'"%s" does not begin with "%s"',
path,
os.path.sep)
elif path == os.path.sep:
return []
return path.split(os.path.sep)[1:]
def _make_unique(self, directory, base_name, ext):
"""Creates a unique key in the map ``directory``.
See :func:`make_unique` for more information.
:param dict directory: The map in which to create the unique key.
:param str base_name: The name of the file without extension.
:param str ext: The file extension. A dot (``'.'``) is not added
automatically; this must be present in ``ext``.
:return: a unique key
:rtype: str
"""
return make_unique(directory, base_name, '%s%s', '%s (%d)%s', ext)
def _make_tags(self, path):
"""Makes sure that all tags up until the last element of ``path`` exist.
:param str path: The absolute path of the tag to make, for example
``'/Tag/Other/Third'``. This string must begin with
:attr:`os.path.sep`.
:raises ValueError: if ``path`` does not begin with :attr:`os.path.sep`
:return: the last tag; ``Third`` in the example above
:rtype: Tag
"""
segments = self._break_path(path)
# Create all tags
current = self
for segment in segments:
if segment not in current:
tag = Tag(segment, current if current != self else None)
if current == self:
# If the tag does not exist, and this is a root tag
# (current == self => this is the first iteration), add the
# tag to self; the parent parameter to Tag above will
# handle other cases
self[segment] = tag
current = tag
else:
current = current[segment]
return current
def __init__(self, **kwargs):
"""Creates a new ImageSource.
:param str date_format: The date format to use when creating file names
for images that do not have a title.
"""
if kwargs:
raise ValueError(
'Unsupported command line argument: %s',
', '.join(k for k in kwargs))
super(ImageSource, self).__init__()
def locate(self, path):
"""Locates an image or tag.
:param str path: The absolute path of the item to locate, for example
``'/Tag/Other/Image.jpg'``. This string must begin with
:attr:`os.path.sep`.
:return: a tag or an image
:rtype: Tag or Image
:raises KeyError: if the item does not exist
:raises ValueError: if path does not begin with os.path.sep
"""
segments = self._break_path(path)
# Locate the last item
current = self
for segment in segments:
current = current[segment]
return current
class FileBasedImageSource(ImageSource):
"""A source of images and tags where the backend is file based.
This is an abstract class.
"""
@classmethod
def add_arguments(self, argparser):
"""Adds all command line arguments for this image source to an argument
parser.
:param argparse.ArgumentParser argparser: The argument parser to which
to add arguments.
"""
argparser.add_argument(
'--database',
help='The database file to use. If not specified, the default one '
'is used.')
def __init__(self, database=None, **kwargs):
"""Creates a new ImageSource.
:param str database: The path to the backend database or directory for
this image source. If :meth:`refresh` is not overloaded, this must
be a valid file name. Its timestamp is used to determine whether to
actually reload all images and tags. If this is not provided, a
default location is used.
"""
super(FileBasedImageSource, self).__init__(**kwargs)
self._path = database or self.default_location
if self._path is None:
raise ValueError('No database')
self._timestamp = 0
def load_tags(self):
"""Loads the tags from the backend resource.
This function is called by refresh if the timestamp of the backend
resource has changed.
:return: a list of tags with the images attached
:rtype: [Tag]
"""
raise NotImplementedError()
@property
def default_location(self):
"""Returns the default location of the backend resource.
:return: the default location of the backend resource, or ``None`` if
none exists
:rtype: str or None
"""
raise NotImplementedError()
@property
def path(self):
"""The path of the backend resource containing the images and tags."""
return self._path
@property
def timestamp(self):
"""The timestamp when the backend resource was last modified."""
return self._timestamp
def refresh(self):
"""Reloads all images and tags from the backend resource if it has
changed since the last update.
If the last modification time of :attr:`path` has changed, the backend
resource is considered to be changed as well.
In this case, the internal timestamp is updated and :meth:`load_tags`
is called.
"""
# Check the timestamp
if self.path:
timestamp = os.stat(self._path).st_mtime
if timestamp == self._timestamp:
return
self._timestamp = timestamp
# Release the old data and reload the tags
self.clear()
self.load_tags()
def locate(self, path):
self.refresh()
return super(FileBasedImageSource, self).locate(path)
PK WeH photofs/_http.py# coding: utf-8
# photofs
# Copyright (C) 2012-2016 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
import urllib3
#: The connection pool
__http = urllib3.PoolManager()
def request(method, *args, **kwargs):
"""Initiates an HTTP request.
This function is shorthand for
``urllib3.PoolManager().request(method, *args, **kwargs)´´.
:param str method: The HTTP verb.
"""
return __http.request(method, *args, **kwargs)
PK \eH