Source code for users.api

from users import User, Group, Capability, db
from peewee import IntegrityError


[docs]def add_user(name, password): try: return User.create(name=name, password=password) except IntegrityError: raise ConflictException("user with the same name already exists")
[docs]def delete_user(id): if not User.delete().where(User.id == id).execute(): raise NotFoundException('no user could be found with this id')
[docs]def update_user(id, updates): with db.atomic(): u = get_user(id) if 'password' in updates: u.set_password(updates['password']) if 'name' in updates: u.name = updates['name'] u.save()
[docs]def get_users(): return User.select()
[docs]def get_user(id=None, name=None): try: if id: return User.get(User.id==id) elif name: return User.get(User.name==name) else: raise ValueError('nither `id` or `name` was given') except User.DoesNotExist: raise NotFoundException("no user could be found with these attributes")
[docs]def add_group(name): try: return Group.create(name=name) except IntegrityError: raise ConflictException("a group with the same name already exists")
[docs]def delete_group(id): if not Group.delete().where(Group.id == id).execute(): raise NotFoundException('no group could be found with this id')
[docs]def update_group(id, updates): with db.atomic(): g = get_group(id) if 'name' in updates: g.name = updates['name'] g.save()
[docs]def get_groups(): return Group.select()
[docs]def get_group(id=None, name=None): try: if id: return Group.get(Group.id==id) elif name: return Group.get(Group.name==name) else: raise ValueError('nither `id` or `name` was given') except Group.DoesNotExist: raise NotFoundException("no group could be found with these attributes")
[docs]def add_user_to_group(userID, groupID): with db.atomic(): u = get_user(userID) g = get_group(groupID) u.groups.add(g) u.save()
[docs]def remove_user_from_group(userID, groupID): with db.atomic(): u = get_user(userID) g = get_group(groupID) u.groups.remove(g) u.save()
[docs]def get_groups_of_user(userID): for g in get_user(id=userID).groups: yield g
[docs]def get_users_in_group(groupID): for u in get_group(id=groupID).users: yield u
[docs]def get_capabilities(): return Capability.select()
[docs]def get_capability(capID): try: return Capability.get(Capability.id == capID) except Capability.DoesNotExist: raise NotFoundException("no capability could be found with these attributes")
[docs]def add_capability(domain, action, simplified=True): try: if simplified: domain = Capability.simToReg(domain) return Capability.create(domain=domain, action=action) except IntegrityError: raise ConflictException("a capability with the same attributes already exists")
[docs]def delete_capability(capID): if not Capability.delete().where(Capability.id == capID).execute(): raise NotFoundException('no capability could be found with this id')
[docs]def update_capability(id, updates): with db.atomic(): cap = get_capability(id) if 'domain' in updates: cap.domain = Capability.simToReg(updates['domain']) if 'action' in updates: cap.action = updates['action'] cap.save()
[docs]def add_capability_to_group(capID, groupID): with db.atomic(): cap = get_capability(capID) grp = get_group(groupID) grp.capabilities.add(cap) cap.save()
[docs]def remove_capability_from_group(capID, groupID): with db.atomic(): cap = get_capability(capID) grp = get_group(groupID) grp.capabilities.remove(cap) cap.save()
[docs]def get_groups_with_capability(capID): for g in get_capability(capID).groups: yield g
[docs]def get_capabilities_of_group(groupID): for c in get_group(groupID).capabilities: yield c
[docs]def get_anonymous_user(): return get_user(name='anonymous')
[docs]def is_anonymous(user): return user.name == 'anonymous'
[docs]class NotFoundException(Exception): pass
[docs]class ConflictException(Exception): pass