import os
from numbers import Integral
from utils.es import Elasticsearch
from elasticsearch import NotFoundError, ConflictError
from uuid import uuid4
from fsdb import Fsdb
from fsdb.hashtools import calc_file_digest, calc_digest
from copy import deepcopy
from urlparse import urlparse
from json import dumps
from libreantdb import DB
from exceptions import NotFoundException, FileOpNotSupported, ConflictException
from logging import getLogger
log = getLogger('archivant')
[docs]class Archivant():
''' Implementation of a Data Access Layer
Archivant handles both an fsdb instance and
a libreantdb one and exposes an high-level API to operate on 'volumes'.
A 'volume' represents a physical/digital object stored within archivant.
Volumes are structured as described in :meth:`~Archivant.normalize_volume`;
shortly, they have language, metadata and attachments.
An attachment is an URL plus some metadata.
If you won't configure the FSDB_PATH parameter, fsdb will not be initialized
and archivant will start in metadata-only mode.
In metdata-only mode all file related functions will raise FileOpNotSupported.
'''
def __init__(self, conf={}):
defaults = {
'FSDB_PATH': None,
'ES_HOSTS': None,
'ES_INDEXNAME': None
}
defaults.update(conf)
self._config = defaults
log.debug('initializing with this config: ' + dumps(self._config))
# initialize fsdb
if self._config['FSDB_PATH']:
self.__fsdb = Fsdb(self._config['FSDB_PATH'])
else:
log.warning('It has not been set any value for FSDB_PATH, file operations will not be supported')
# initialize elasticsearch
if not self._config['ES_INDEXNAME']:
raise ValueError('ES_INDEXNAME cannot be empty')
self.__db = None
@property
def _db(self):
if self.__db is None:
db = DB(Elasticsearch(hosts=self._config['ES_HOSTS']),
index_name=self._config['ES_INDEXNAME'])
db.setup_db()
self.__db = db
return self.__db
@property
def _fsdb(self):
try:
return self.__fsdb
except AttributeError:
raise FileOpNotSupported("FSDB_PATH paramenter has not been set")
[docs] def is_file_op_supported(self):
try:
self._fsdb
except FileOpNotSupported:
return False
else:
return True
[docs] @staticmethod
def normalize_volume(volume):
'''convert volume metadata from es to archivant format
This function makes side effect on input volume
output example::
{
'id': 'AU0paPZOMZchuDv1iDv8',
'type': 'volume',
'metadata': {'_language': 'en',
'key1': 'value1',
'key2': 'value2',
'key3': 'value3'},
'attachments': [{'id': 'a910e1kjdo2d192d1dko1p2kd1209d',
'type' : 'attachment',
'url': 'fsdb:///624bffa8a6f90813b7982d0e5b4c1475ebec40e3',
'metadata': {'download_count': 0,
'mime': 'application/json',
'name': 'tmp9fyat_',
'notes': 'this file is awsome',
'sha1': '624bffa8a6f90813b7982d0e5b4c1475ebec40e3',
'size': 10}
}]
}
'''
res = dict()
res['type'] = 'volume'
res['id'] = volume['_id']
if '_score' in volume:
res['score'] = volume['_score']
source = volume['_source']
attachments = source['_attachments']
del(source['_attachments'])
del(source['_text_' + source['_language']])
res['metadata'] = source
atts = list()
for attachment in attachments:
atts.append(Archivant.normalize_attachment(attachment))
res['attachments'] = atts
return res
[docs] @staticmethod
def normalize_attachment(attachment):
''' Convert attachment metadata from es to archivant format
This function makes side effect on input attachment
'''
res = dict()
res['type'] = 'attachment'
res['id'] = attachment['id']
del(attachment['id'])
res['url'] = attachment['url']
del(attachment['url'])
res['metadata'] = attachment
return res
[docs] @staticmethod
def denormalize_volume(volume):
'''convert volume metadata from archivant to es format'''
id = volume.get('id', None)
res = dict()
res.update(volume['metadata'])
denorm_attachments = list()
for a in volume['attachments']:
denorm_attachments.append(Archivant.denormalize_attachment(a))
res['_attachments'] = denorm_attachments
return id, res
[docs] @staticmethod
def denormalize_attachment(attachment):
'''convert attachment metadata from archivant to es format'''
res = dict()
ext = ['id', 'url']
for k in ext:
if k in attachment['metadata']:
raise ValueError("metadata section could not contain special key '{}'".format(k))
res[k] = attachment[k]
res.update(attachment['metadata'])
return res
def _req_raw_volume(self, volumeID):
try:
return self._db.get_book_by_id(volumeID)
except NotFoundError:
raise NotFoundException("could not found volume with id: '{}'".format(volumeID))
[docs] def import_volume(self, volume):
_id, den_v = Archivant.denormalize_volume(volume)
try:
self._db.add_book(body=den_v, id=_id)
except ConflictError:
raise ConflictException("A volume with the same id already exists: '{}'".format(_id))
[docs] def iter_all_volumes(self):
'''iterate over all stored volumes'''
for raw_volume in self._db.iterate_all():
v = self.normalize_volume(raw_volume)
del v['score']
yield v
[docs] def get_volume(self, volumeID):
log.debug("Requested volume with id:'{}'".format(volumeID))
return Archivant.normalize_volume(self._req_raw_volume(volumeID))
[docs] def get_attachment(self, volumeID, attachmentID):
log.debug("Requested attachment '{}' of the volume '{}'".format(attachmentID, volumeID))
rawVolume = self._req_raw_volume(volumeID)
for rawAttachment in rawVolume['_source']['_attachments']:
if rawAttachment['id'] == attachmentID:
return Archivant.normalize_attachment(rawAttachment)
raise NotFoundException("could not found attachment '{}' of the volume '{}'".format(attachmentID, volumeID))
[docs] def get_file(self, volumeID, attachmentID):
log.debug("Requested file associated with attachment '{}' of the volume '{}'".format(attachmentID, volumeID))
attachment = self.get_attachment(volumeID, attachmentID)
return self._resolve_url(attachment['url'])
[docs] def delete_attachments(self, volumeID, attachmentsID):
''' delete attachments from a volume '''
log.debug("deleting attachments from volume '{}': {}".format(volumeID, attachmentsID))
rawVolume = self._req_raw_volume(volumeID)
insID = [a['id'] for a in rawVolume['_source']['_attachments']]
# check that all requested file are present
for id in attachmentsID:
if id not in insID:
raise NotFoundException("could not found attachment '{}' of the volume '{}'".format(id, volumeID))
for index, id in enumerate(attachmentsID):
rawVolume['_source']['_attachments'].pop(insID.index(id))
self._db.modify_book(volumeID, rawVolume['_source'], version=rawVolume['_version'])
[docs] def delete_volume(self, volumeID):
log.debug("Deleting volume: '{}'".format(volumeID))
try:
self._db.delete_book(volumeID)
except NotFoundError:
raise NotFoundException("could not found volume with id: '{}'".format(volumeID))
[docs] def insert_attachments(self, volumeID, attachments):
''' add attachments to an already existing volume '''
log.debug("adding new attachments to volume '{}': {}".format(volumeID, attachments))
if not attachments:
return
rawVolume = self._req_raw_volume(volumeID)
attsID = list()
for index, a in enumerate(attachments):
try:
rawAttachment = self._assemble_attachment(a['file'], a)
rawVolume['_source']['_attachments'].append(rawAttachment)
attsID.append(rawAttachment['id'])
except Exception:
log.exception("Error while elaborating attachments array at index: {}".format(index))
raise
self._db.modify_book(volumeID, rawVolume['_source'], version=rawVolume['_version'])
return attsID
[docs] def insert_volume(self, metadata, attachments=[]):
'''Insert a new volume
Returns the ID of the added volume
`metadata` must be a dict containg metadata of the volume::
{
"_language" : "it", # language of the metadata
"key1" : "value1", # attribute
"key2" : "value2",
...
"keyN" : "valueN"
}
The only required key is `_language`
`attachments` must be an array of dict::
{
"file" : "/prova/una/path/a/caso" # path or fp
"name" : "nome_buffo.ext" # name of the file (extension included) [optional if a path was given]
"mime" : "application/json" # mime type of the file [optional]
"notes" : "this file is awesome" # notes that will be attached to this file [optional]
}
'''
log.debug("adding new volume:\n\tdata: {}\n\tfiles: {}".format(metadata, attachments))
requiredFields = ['_language']
for requiredField in requiredFields:
if requiredField not in metadata:
raise KeyError("Required field '{}' is missing".format(requiredField))
volume = deepcopy(metadata)
attsData = []
for index, a in enumerate(attachments):
try:
attData = self._assemble_attachment(a['file'], a)
attsData.append(attData)
except Exception:
log.exception("Error while elaborating attachments array at index: {}".format(index))
raise
volume['_attachments'] = attsData
log.debug('constructed volume for insertion: {}'.format(volume))
addedVolume = self._db.add_book(body=volume)
log.debug("added new volume: '{}'".format(addedVolume['_id']))
return addedVolume['_id']
def _assemble_attachment(self, file, metadata):
''' store file and return a dict containing assembled metadata
param `file` must be a path or a File Object
param `metadata` must be a dict:
{
"name" : "nome_buffo.ext" # name of the file (extension included) [optional if a path was given]
"mime" : "application/json" # mime type of the file [optional]
"notes" : "this file is awesome" # notes about this file [optional]
}
'''
res = dict()
if isinstance(file, basestring) and os.path.isfile(file):
res['name'] = metadata['name'] if 'name' in metadata else os.path.basename(file)
res['size'] = os.path.getsize(file)
res['sha1'] = calc_file_digest(file, algorithm="sha1")
elif hasattr(file, 'read') and hasattr(file, 'seek'):
if 'name' in metadata and metadata['name']:
res['name'] = metadata['name']
elif hasattr(file, 'name'):
file['name'] = file.name
else:
raise ValueError("Could not assign a name to the file")
old_position = file.tell()
file.seek(0, os.SEEK_END)
res['size'] = file.tell() - old_position
file.seek(old_position, os.SEEK_SET)
res['sha1'] = calc_digest(file, algorithm="sha1")
file.seek(old_position, os.SEEK_SET)
else:
raise ValueError("Unsupported file value type: {}".format(type(file)))
res['id'] = uuid4().hex
res['mime'] = metadata['mime'] if 'mime' in metadata else None
res['notes'] = metadata['notes'] if 'notes' in metadata else ""
res['download_count'] = 0
fsdb_id = self._fsdb.add(file)
res['url'] = "fsdb:///" + fsdb_id
return res
[docs] def update_volume(self, volumeID, metadata):
'''update existing volume metadata
the given metadata will substitute the old one
'''
log.debug('updating volume metadata: {}'.format(volumeID))
rawVolume = self._req_raw_volume(volumeID)
normalized = self.normalize_volume(rawVolume)
normalized['metadata'] = metadata
_, newRawVolume = self.denormalize_volume(normalized)
self._db.modify_book(volumeID, newRawVolume)
[docs] def update_attachment(self, volumeID, attachmentID, metadata):
'''update an existing attachment
the given metadata dict will be merged with the old one.
only the following fields could be updated:
[name, mime, notes, download_count]
'''
log.debug('updating metadata of attachment {} from volume {}'.format(attachmentID, volumeID))
modifiable_fields = ['name', 'mime', 'notes', 'download_count']
for k in metadata.keys():
if k not in modifiable_fields:
raise ValueError('Not modifiable field given: {}'.format(k))
if 'name' in metadata and not isinstance(metadata['name'], basestring):
raise ValueError("'name' must be a string")
if 'mime' in metadata and not isinstance(metadata['mime'], basestring):
raise ValueError("'mime' must be a string")
if 'notes' in metadata and not isinstance(metadata['notes'], basestring):
raise ValueError("'notes' must be a string")
if 'download_count' in metadata and not isinstance(metadata['download_count'], Integral):
raise ValueError("'download_count' must be a number")
rawVolume = self._req_raw_volume(volumeID)
for attachment in rawVolume['_source']['_attachments']:
if attachment['id'] == attachmentID:
attachment.update(metadata)
self._db.modify_book(id=volumeID, body=rawVolume['_source'], version=rawVolume['_version'])
return
raise NotFoundException('Could not found attachment with id {} in volume {}'.format(attachmentID, volumeID))
def _resolve_url(self, url):
parseResult = urlparse(url)
if parseResult.scheme == "fsdb":
return self._fsdb[os.path.basename(parseResult.path)]
else:
raise Exception("url scheme '{}' not supported".format(parseResult.scheme))
[docs] def dangling_files(self):
'''iterate over fsdb files no more attached to any volume'''
for fid in self._fsdb:
if not self._db.file_is_attached('fsdb:///' + fid):
yield fid
[docs] def shrink_local_fsdb(self, dangling=True, corrupted=True, dryrun=False):
'''shrink local fsdb by removing dangling and/or corrupted files
return number of deleted files
'''
log.debug('shrinking local fsdb [danglings={}, corrupted={}]'.format(dangling, corrupted))
count = 0
if dangling:
for fid in self.dangling_files():
log.info("shrinking: removing dangling '{}'".format(fid))
if not dryrun:
self._fsdb.remove(fid)
count += 1
if corrupted:
for fid in self._fsdb.corrupted():
log.info("shrinking: removing corrupted '{}'".format(fid))
if not dryrun:
self._fsdb.remove(fid)
count += 1
return count