Source code for libreantdb.api

import time
import re

from elasticsearch import NotFoundError, RequestError, TransportError
from elasticsearch import __version__ as es_version
from elasticsearch.helpers import scan, bulk, reindex

from exceptions import MappingsException

import logging
log = logging.getLogger(__name__)

# https://www.elastic.co/guide/en/elasticsearch/reference/5.5/string.html
TEXT = 'text' if es_version[0] >= 5 else 'string'

FALSE = 'false' if es_version[0] >= 5 else 'no'


[docs]def current_time_millisec(): return int(round(time.time() * 10**3))
[docs]def validate_book(body): ''' This does not only accept/refuse a book. It also returns an ENHANCED version of body, with (mostly fts-related) additional fields. This function is idempotent. ''' if '_language' not in body: raise ValueError('language needed') if len(body['_language']) > 2: raise ValueError('invalid language: %s' % body['_language']) # remove old _text_* fields for k in body.keys(): if k.startswith('_text'): del(body[k]) allfields = collectStrings(body) body['_text_%s' % body['_language']] = ' '.join(allfields) return body
[docs]def collectStrings(leftovers): strings = [] if isinstance(leftovers, basestring): return leftovers.split() elif isinstance(leftovers, list): for l in leftovers: strings.extend(collectStrings(l)) return strings elif isinstance(leftovers, dict): for key, value in leftovers.items(): if not key.startswith('_'): strings.extend(collectStrings(value)) return strings else: return strings
[docs]class DB(object): ''' This class contains every query method and every operation on the index The following elasticsearch body response example provides the typical structure of a single document. .. code-block:: ruby { "_index" : "libreant", "_type" : "book", "_id" : "AU4RleAfD1zQdqx6OQ8Y", "_version" : 1, "found" : true, "_source": {"_language": "en", "_text_en": "marco belletti pdf file latex manual", "author": "marco belletti", "type": "pdf file", "title": "latex manual", "_attachments": [{"sha1": "dc8dc34b3e0fec2377e5cf9ea7e4780d87ff18c5", "name": "LaTeX_Wikibook.pdf", "url": "fsdb:///dc8dc34b3e0fec2377e5cf9ea7e4780d87ff18c5", "notes": "A n example bookLatex wikibook", "mime": "application/pdf", "download_count": 7, "id": "17fd3d898a834e2689340cc8aacdebb4", "size": 23909451}] } } ''' properties = { "_insertion_date": { "type": "long", "null_value": 0}, "_language": { "type": TEXT, "index": FALSE}, "_text_en": { "type": TEXT, "analyzer": "english"}, "_text_it": { "type": TEXT, "analyzer": "it_analyzer"}, } # Just like the default one # http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/analysis-lang-analyzer.html#italian-analyzer # but the stemmer changed from light_italian to italian settings = {"analysis": { "filter": { "italian_elision": { "type": "elision", "articles": [ "c", "l", "all", "dall", "dell", "nell", "sull", "coll", "pell", "gl", "agl", "dagl", "degl", "negl", "sugl", "un", "m", "t", "s", "v", "d" ] }, "italian_stop": { "type": "stop", "stopwords": "_italian_"}, "italian_stemmer": { "type": "stemmer", "language": "italian"} }, "analyzer": { "it_analyzer": { "type": "custom", "tokenizer": "standard", "filter": [ "italian_elision", "lowercase", "italian_stop", "italian_stemmer" ] } } }} # Setup {{{2 def __init__(self, es, index_name): self.es = es self.index_name = index_name # book_validator can adjust the book, and raise if it's not valid self.book_validator = validate_book
[docs] def setup_db(self, wait_for_ready=True): ''' Create and configure index If `wait_for_ready` is True, this function will block until status for `self.index_name` will be `yellow` ''' if self.es.indices.exists(self.index_name): try: self.update_mappings() except MappingsException as ex: log.error(ex) log.warn('An old or wrong properties mapping has been found for index: "{0}",\ this could led to some errors. It is recomanded to run "libreant-db upgrade"'.format(self.index_name)) else: log.debug("Index is missing: '{0}'".format(self.index_name)) self.create_index() if wait_for_ready: log.debug('waiting for index "{}" to be ready'.format(self.index_name)) self.es.cluster.health(index=self.index_name, level='index', wait_for_status='yellow') log.debug('index "{}" is now ready'.format(self.index_name))
[docs] def update_mappings(self): log.debug('updating index properties mappings') errors = {} for prop_k, prop_m in self.properties.iteritems(): try: self.es.indices.put_mapping(index=self.index_name, doc_type='book', body={'properties': { prop_k: prop_m}}) except RequestError as re: cause = re.info['error'] if type(cause) is dict: cause = cause['root_cause'][0]['reason'] errors[prop_k] = cause.replace('\n', ' ') if len(errors) > 0: raise MappingsException("Couldn't update index properties mapping: {0}".format(errors))
[docs] def create_index(self, indexname=None, index_conf=None): ''' Create the index Create the index with given configuration. If `indexname` is provided it will be used as the new index name instead of the class one (:py:attr:`DB.index_name`) :param index_conf: configuration to be used in index creation. If this is not specified the default index configuration will be used. :raises Exception: if the index already exists. ''' if indexname is None: indexname = self.index_name log.debug("Creating new index: '{0}'".format(indexname)) if index_conf is None: index_conf = {'settings': self.settings, 'mappings': {'book': {'properties': self.properties}}} try: self.es.indices.create(index=indexname, body=index_conf) except TransportError as te: if te.error.startswith("IndexAlreadyExistsException"): raise Exception("Cannot create index '{}', already exists".format(indexname)) else: raise
[docs] def clone_index(self, new_indexname, index_conf=None): '''Clone current index All entries of the current index will be copied into the newly created one named `new_indexname` :param index_conf: Configuration to be used in the new index creation. This param will be passed directly to :py:func:`DB.create_index` ''' log.debug("Cloning index '{}' into '{}'".format(self.index_name, new_indexname)) self.create_index(indexname=new_indexname, index_conf=index_conf) reindex(self.es, self.index_name, new_indexname)
[docs] def reindex(self, new_index=None, index_conf=None): '''Rebuilt the current index This function could be useful in the case you want to change some index settings/mappings and you don't want to loose all the entries belonging to that index. This function is built in such a way that you can continue to use the old index name, this is achieved using index aliases. The old index will be cloned into a new one with the given `index_conf`. If we are working on an alias, it is redirected to the new index. Otherwise a brand new alias with the old index name is created in such a way that points to the newly create index. Keep in mind that even if you can continue to use the same index name, the old index will be deleted. :param index_conf: Configuration to be used in the new index creation. This param will be passed directly to :py:func:`DB.create_index` ''' alias = self.index_name if self.es.indices.exists_alias(name=self.index_name) else None if alias: original_index=self.es.indices.get_alias(self.index_name).popitem()[0] else: original_index=self.index_name if new_index is None: mtc = re.match(r"^.*_v(\d)*$", original_index) if mtc: new_index = original_index[:mtc.start(1)] + str(int(mtc.group(1)) + 1) else: new_index = original_index + '_v1' log.debug("Reindexing {{ alias: '{}', original_index: '{}', new_index: '{}'}}".format(alias, original_index, new_index)) self.clone_index(new_index, index_conf=index_conf) if alias: log.debug("Moving alias from ['{0}' -> '{1}'] to ['{0}' -> '{2}']".format(alias, original_index, new_index)) self.es.indices.update_aliases(body={ "actions" : [ { "remove" : { "alias": alias, "index" : original_index} }, { "add" : { "alias": alias, "index" : new_index } } ]}) log.debug("Deleting old index: '{}'".format(original_index)) self.es.indices.delete(original_index) if not alias: log.debug("Crating new alias: ['{0}' -> '{1}']".format(original_index, new_index)) self.es.indices.update_aliases(body={ "actions" : [ { "add" : { "alias": original_index, "index" : new_index } } ]})
# End setup }} # Queries {{{2 def __len__(self): return self.es.count(index=self.index_name)['count'] def _search(self, body, **kargs): return self.es.search(index=self.index_name, body=body, doc_type='book', **kargs) def _get_search_field(self, field, value): return {'query': {'match': {field: value}} }
[docs] def mlt(self, _id): ''' High-level method to do "more like this". Its exact implementation can vary. ''' query = { 'query': {'more_like_this': { 'like': {'_id': _id}, 'min_term_freq': 1, 'min_doc_freq': 1, }} } if es_version[0] <= 1: mlt = query['query']['more_like_this'] mlt['ids'] = [_id] del mlt['like'] return self._search(query)
[docs] def get_all_books(self, size=30): return self._search({}, size=size)
[docs] def iterate_all(self): return scan(self.es, index=self.index_name)
[docs] def get_last_inserted(self, size=30): query = {"query": {"match_all": {}}, "sort": [{"_insertion_date": {"order":"desc", "missing": "_last"}}]} return self._search(body=query, size=size)
[docs] def get_books_simplequery(self, query): return self._search({'query': {'multi_match': {'query': query, 'fields': '_text_*'} }})
[docs] def get_books_by_title(self, title): return self._search(self._get_search_field('title', title))
[docs] def get_books_by_actor(self, authorname): return self._search(self._get_search_field('actors', authorname))
[docs] def get_book_by_id(self, id): return self.es.get(index=self.index_name, id=id, doc_type='book')
[docs] def get_books_querystring(self, query, **kargs): q = {'query': query, 'fields': ['_text_*']} return self._search({'query': dict(query_string=q)}, **kargs)
[docs] def file_is_attached(self, url): '''return true if at least one book has file with the given url as attachment ''' body = self._get_search_field('_attachments.url', url) return self.es.count(index=self.index_name, body=body)['count'] > 0
[docs] def autocomplete(self, fieldname, start): raise NotImplementedError()
# End queries }}} # Operations {{{2
[docs] def add_book(self, body, doc_type='book'): ''' Call it like this: db.add_book(doc_type='book', body={'title': 'foobar', '_language': 'it'}) ''' body = validate_book(body) body['_insertion_date'] = current_time_millisec() return self.es.index(index=self.index_name, doc_type=doc_type, body=body)
[docs] def delete_book(self, id): self.es.delete(index=self.index_name, id=id, doc_type='book')
[docs] def delete_all(self): '''Delete all books from the index''' def delete_action_gen(): scanner = scan(self.es, index=self.index_name, query={'query': {'match_all':{}}}) for v in scanner: yield { '_op_type': 'delete', '_index': self.index_name, '_type': v['_type'], '_id': v['_id'], } bulk(self.es, delete_action_gen())
[docs] def update_book(self, id, body, doc_type='book'): ''' Update a book The "body" is merged with the current one. Yes, it is NOT overwritten. In case of concurrency conflict this function could raise `elasticsearch.ConflictError` ''' # note that we are NOT overwriting all the _source, just merging book = self.get_book_by_id(id) book['_source'].update(body) validated = validate_book(book['_source']) ret = self.es.index(index=self.index_name, id=id, doc_type=doc_type, body=validated, version=book['_version']) return ret
[docs] def modify_book(self, id, body, doc_type='book', version=None): ''' replace the entire book body Instead of `update_book` this function will overwrite the book content with param body If param `version` is given, it will be checked that the changes are applied upon that document version. If the document version provided is different from the one actually found, an `elasticsearch.ConflictError` will be raised ''' validatedBody = validate_book(body) params = dict(index=self.index_name, id=id, doc_type=doc_type, body=validatedBody) if version: params['version'] = version ret = self.es.index(**params) return ret
[docs] def increment_download_count(self, id, attachmentID, doc_type='book'): ''' Increment the download counter of a specific file ''' body = self.es.get(index=self.index_name, id=id, doc_type='book', _source_include='_attachments')['_source'] for attachment in body['_attachments']: if attachment['id'] == attachmentID: attachment['download_count'] += 1 self.es.update(index=self.index_name, id=id, doc_type=doc_type, body={"doc": {'_attachments': body['_attachments']}}) return raise NotFoundError("No attachment could be found with id: {}".format(attachmentID))
# End operations }}} # vim: set fdm=marker fdl=1: