# -*- coding: utf-8 -*-
"""This module the database manager of ComPath."""
import datetime
import logging
from typing import List, Optional
from sqlalchemy import and_, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from bio2bel.utils import get_connection
from . import managers
from .constants import EQUIVALENT_TO, IS_PART_OF, MAPPING_TYPES, MODULE_NAME
from .models import Base, PathwayMapping, User, Vote, mappings_users
__all__ = [
'Manager'
]
log = logging.getLogger(__name__)
def _flip_service_order(service_1_name: str, service_2_name: str) -> bool:
"""Decide whether the service order should be flipped (true if they should be).
:param service_1_name:
:param service_2_name:
"""
if service_1_name == service_2_name:
return False
return service_1_name > service_2_name
def _ensure_manager(name):
if name not in managers:
raise ValueError('Manager does not exist for {}. Available: {}'.format(name, managers))
[docs]class Manager(object):
"""Database manager."""
def __init__(self, engine, session):
"""Init ComPath manager."""
self.engine = engine
self.session = session
self.create_all()
[docs] @staticmethod
def from_connection(connection=None):
"""Initialize manager from connection string"""
connection = get_connection(MODULE_NAME, connection)
engine = create_engine(connection)
session_maker = sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
session = scoped_session(session_maker)
return Manager(engine, session)
[docs] def create_all(self, check_first: bool = True):
"""Create tables for ComPath."""
Base.metadata.create_all(self.engine, checkfirst=check_first)
[docs] def drop_all(self, check_first: bool = True):
"""Drop all tables for ComPath."""
Base.metadata.drop_all(self.engine, checkfirst=check_first)
"""Query methods"""
[docs] def count_votes(self) -> int:
"""Count the votes in the database."""
return self.session.query(Vote).count()
[docs] def count_mappings(self) -> int:
"""Count the mappings in the database."""
return self.session.query(PathwayMapping).count()
[docs] def count_users(self) -> int:
"""Count the Users in the database."""
return self.session.query(User).count()
[docs] def count_mapping_user(self) -> int:
"""Count the UsersMappings table in the database.
:rtype: int
"""
return self.session.query(mappings_users).count()
[docs] def get_all_mappings(self) -> List[PathwayMapping]:
"""Get all mappings in the database."""
return self.session.query(PathwayMapping).all()
[docs] def get_all_accepted_mappings(self) -> List[PathwayMapping]:
"""Get all accepted mappings in the database."""
return self.session.query(PathwayMapping).filter(PathwayMapping.accepted == True).all()
[docs] def get_mappings_by_type(self, mapping_type: str) -> List[PathwayMapping]:
"""Get all mappings in the database.
:param mapping_type: type of the mapping
"""
if mapping_type not in MAPPING_TYPES:
raise ValueError('{} is not valid mapping mapping_type'.format(mapping_type))
return self.session.query(PathwayMapping).filter(PathwayMapping.type == mapping_type).all()
[docs] def get_vote_by_id(self, vote_id: str) -> Optional[Vote]:
"""Get a vote by its id.
:param vote_id: identifier
"""
return self.session.query(Vote).filter(Vote.id == vote_id).one_or_none()
[docs] def get_vote(self, user: User, mapping: PathwayMapping) -> Optional[Vote]:
"""Get a vote.
:param user: User instance
:param mapping: Mapping instance
"""
return self.session.query(Vote).filter(and_(Vote.user == user, Vote.mapping == mapping)).one_or_none()
[docs] def get_user_by_email(self, email: str) -> Optional[User]:
"""Get a user by their email address."""
return self.session.query(User).filter(User.email == email).one_or_none()
[docs] def get_mapping(self, service_1_name, pathway_1_id, pathway_1_name, service_2_name, pathway_2_id, pathway_2_name,
mapping_type):
"""Query mapping in the database.
:param str service_1_name: manager name of the service 1
:param str pathway_1_id: pathway 1 id
:param str pathway_1_name: pathway 1 name
:param str service_2_name: manager name of the service 1
:param str pathway_2_id: pathway 2 id
:param str pathway_2_name: pathway 2 name
:param str mapping_type: mapping type (isPartOf or equivalentTo)
:rtype: Optional[Mapping]
"""
mapping_filter = and_(
PathwayMapping.service_1_name == service_1_name,
PathwayMapping.service_1_pathway_id == pathway_1_id,
PathwayMapping.service_1_pathway_name == pathway_1_name,
PathwayMapping.service_2_name == service_2_name,
PathwayMapping.service_2_pathway_id == pathway_2_id,
PathwayMapping.service_2_pathway_name == pathway_2_name,
PathwayMapping.type == mapping_type,
)
return self.session.query(PathwayMapping).filter(mapping_filter).one_or_none()
[docs] def get_mapping_by_id(self, mapping_id: int) -> Optional[PathwayMapping]:
"""Get a mapping by its id.
:param mapping_id: mapping id
"""
return self.session.query(PathwayMapping).filter(PathwayMapping.id == mapping_id).one_or_none()
[docs] def get_or_create_vote(self, user, mapping, vote_type=True):
"""Get or create vote.
:param User user: User instance
:param PathwayMapping mapping: Mapping instance
:param Optional[Vote.type] vote_type: vote type
:rtype: Vote
"""
vote = self.get_vote(user, mapping)
if vote is None:
vote = Vote(
user=user,
mapping=mapping,
type=vote_type
)
self.session.add(vote)
self.session.commit()
# If there was already a vote, and it's being changed
elif vote_type is not None:
vote.type = vote_type
vote.changed = datetime.datetime.utcnow()
self.session.commit()
return vote
[docs] def get_or_create_mapping(self, service_1_name, pathway_1_id, pathway_1_name, service_2_name, pathway_2_id,
pathway_2_name, mapping_type, user):
"""Get or create a mapping.
:param str service_1_name: manager name of the service 1
:param str pathway_1_name: pathway 1 name
:param str pathway_1_id: pathway 1 id
:param str service_2_name: manager name of the service 1
:param str pathway_2_name: pathway 2 name
:param str pathway_2_id: pathway 2 id
:param str mapping_type: type of mapping
:param User user: the user
:return: PathwayMapping and boolean indicating if the mapping was created or not
:rtype: tuple[PathwayMapping,bool]
"""
# Ensure maintaining the order of pathways if they belong to the same database
flip_order = (
mapping_type == EQUIVALENT_TO and
service_1_name == service_2_name and
_flip_service_order(pathway_1_name, pathway_2_name)
)
if flip_order:
return self.get_or_create_mapping(
service_2_name,
pathway_2_id,
pathway_2_name,
service_1_name,
pathway_1_id,
pathway_1_name,
mapping_type,
user
)
# Ensure maintaining the order of the resources
if mapping_type == EQUIVALENT_TO and _flip_service_order(service_1_name, service_2_name):
return self.get_or_create_mapping(
service_2_name,
pathway_2_id,
pathway_2_name,
service_1_name,
pathway_1_id,
pathway_1_name,
mapping_type,
user
)
_ensure_manager(service_1_name)
_ensure_manager(service_2_name)
mapping = self.get_mapping(
service_1_name=service_1_name,
pathway_1_id=pathway_1_id,
pathway_1_name=pathway_1_name,
service_2_name=service_2_name,
pathway_2_id=pathway_2_id,
pathway_2_name=pathway_2_name,
mapping_type=mapping_type
)
if mapping is not None:
_ = self.claim_mapping(mapping, user)
return mapping, False
mapping = PathwayMapping(
service_1_name=service_1_name,
service_1_pathway_id=pathway_1_id,
service_1_pathway_name=pathway_1_name,
service_2_name=service_2_name,
service_2_pathway_id=pathway_2_id,
service_2_pathway_name=pathway_2_name,
type=mapping_type
)
vote = Vote(
mapping=mapping,
user=user
)
self.session.add(mapping)
self.session.add(vote)
mapping.creators.append(user)
self.session.commit()
return mapping, True
[docs] def delete_all_mappings(self):
"""Delete all the votes then all the mappings."""
self.session.query(Vote).delete()
self.session.query(PathwayMapping).delete()
self.session.commit()
[docs] def delete_mapping_by_id(self, mapping_id):
"""Delete a mapping by its id.
:param int mapping_id: mapping id
:rtype: bool
"""
mapping = self.get_mapping_by_id(mapping_id)
if mapping:
self.session.delete(mapping)
self.session.commit()
return True
return False
"""Custom Model Manipulations"""
[docs] def claim_mapping(self, mapping, user):
"""Check if user has already established the mapping, if not claims it.
:param PathwayMapping mapping: Mapping instance
:param User user: User
:rtype: bool
:return: if mapping was assigned to user
"""
if user in mapping.creators:
return False
mapping.creators.append(user)
_ = self.get_or_create_vote(user, mapping)
return True
[docs] def accept_mapping(self, mapping_id):
"""Accept established mapping (from user or curator consensus).
:param int mapping_id: mapping id
:rtype: tuple[PathwayMapping,bool]
:return: mapping and boolean that indicates if transaction was made
"""
mapping = self.get_mapping_by_id(mapping_id)
if not mapping:
return None, False
if mapping.accepted:
return mapping, False
mapping.accepted = True
self.session.commit()
return mapping, True
[docs] def get_mappings_from_pathway_with_relationship(self, type, service_name, pathway_id, pathway_name):
"""Get all mappings matching pathway and service name.
:param str type: mapping type
:param str service_name: service name
:param str pathway_id: original pathway identifier
:param str pathway_name: pathway name
:rtype: list[PathwayMapping]
:return:
"""
return self.session.query(PathwayMapping).filter(
PathwayMapping.has_pathway_tuple(type, service_name, pathway_id, pathway_name)).all()
[docs] def get_decendents_mappings_from_pathway_with_is_part_of_relationship(self, service_name, pathway_id, pathway_name):
"""Get all mappings matching pathway and service name.
:param str type: mapping type
:param str service_name: service name
:param str pathway_id: original pathway identifier
:param str pathway_name: pathway name
:rtype: list[PathwayMapping]
:return:
"""
return self.session.query(PathwayMapping).filter(
PathwayMapping.has_descendant_pathway_tuple(IS_PART_OF, service_name, pathway_id, pathway_name)).all()
[docs] def get_ancestry_mappings_from_pathway_with_is_part_of_relationship(self, service_name, pathway_id, pathway_name):
"""Get all mappings matching pathway and service name.
:param str type: mapping type
:param str service_name: service name
:param str pathway_id: original pathway identifier
:param str pathway_name: pathway name
:rtype: list[PathwayMapping]
:return:
"""
return self.session.query(PathwayMapping).filter(
PathwayMapping.has_ancestry_pathway_tuple(IS_PART_OF, service_name, pathway_id, pathway_name)).all()
[docs] def get_all_mappings_from_pathway(self, service_name, pathway_id, pathway_name):
"""Get all mappings matching pathway and service name.
:param str service_name: service name
:param str pathway_id: original pathway identifier
:param str pathway_name: pathway name
:rtype: list[PathwayMapping]
:return:
"""
return self.session.query(PathwayMapping).filter(
PathwayMapping.has_pathway(service_name, pathway_id, pathway_name)).all()
[docs] def get_all_pathways_from_db_with_mappings(self, pathway_database):
"""Get all mappings that contain a pathway from a given database.
:param str service_name: service name
:param str pathway_id: original pathway identifer
:param str pathway_name: pathway name
:rtype: list[PathwayMapping]
:return:
"""
return self.session.query(PathwayMapping).filter(
PathwayMapping.has_database_pathway(pathway_database)).all()
[docs] def infer_hierarchy(self, resource, pathway_id, pathway_name):
"""Infer the possible hierarchy of a given pathway based on its equivalent mappings.
:param str type: mapping type
:param str resource: service name
:param str pathway_id: pathway original identifier
:param str pathway_name: pathway name
:return:
"""
matching_mappings = self.get_mappings_from_pathway_with_relationship(
EQUIVALENT_TO, resource, pathway_id, pathway_name
)
inferred_mappings = []
for mapping in matching_mappings:
# Get all hierarchical mappings from equivalent pathways
complement_resource, complement_pathway_id, complement_pathway_name = mapping.get_complement_mapping_info(
resource, pathway_id, pathway_name
)
hierarchical_mappings_from_complement = self.get_mappings_from_pathway_with_relationship(
IS_PART_OF,
complement_resource,
complement_pathway_id,
complement_pathway_name
)
for hierarchical_mapping in hierarchical_mappings_from_complement:
inferred_mappings.append(hierarchical_mapping.get_complement_mapping_info(
resource, pathway_id, pathway_name
))
return inferred_mappings