Source code for compath.visualization.d3_dendrogram

# -*- coding: utf-8 -*-

"""Utils to generate the D3.js dendrogram. This module is adapted from https://gist.github.com/mdml/7537455."""

import itertools as itt
import math

import numpy as np
import pandas as pd
import scipy
import scipy.cluster
import scipy.stats
from scipy.spatial.distance import pdist


def _check_error_distance(distance_matrix, pathway_manager_dict, similarity_matrix):
    """Remove column and row in matrix after value error to proceed with clustering.

    :param numpy.ndarray distance_matrix:
    :param dict pathway_manager_dict:
    :param pandas.DataFrame similarity_matrix:
    :rtype: tuple(numpy.ndarray, dict, pandas.DataFrame)
    :return: distance matrix, pathway_manager_dict, and similarity_matrix
    """
    if np.all(np.isfinite(distance_matrix)):
        return distance_matrix, pathway_manager_dict, similarity_matrix

    # Checks rows that only contain one unique number and this number is close to 1
    pathways_to_remove = {
        index
        for index, row in similarity_matrix.iterrows()
        if row.nunique() == 1 and math.isclose(row.unique()[0], 1, rel_tol=1e-5)
    }

    # Remove all columns/rows having the pathway label
    for pathway in pathways_to_remove:
        similarity_matrix = similarity_matrix.drop(axis=0, labels=pathway)
        similarity_matrix = similarity_matrix.drop(axis=1, labels=pathway)

    # Recalculate the distances
    distance_matrix = pdist(
        similarity_matrix,
        metric='correlation',
        # **{'centered': False} # TODO: try in Python 3.6
    )

    # Remove pathways
    for pathway in pathways_to_remove:
        pathway_manager_dict.pop(pathway)

    return distance_matrix, pathway_manager_dict, similarity_matrix


[docs]def create_similarity_matrix(gene_sets): """Create a similarity matrix for a given pathway-geneset dataset. :param dict gene_sets: pathway gene set dictionary :rtype: pandas.DataFrame :returns: similarity matrix """ index = sorted(gene_sets.keys()) similarity_dataframe = pd.DataFrame(0.0, index=index, columns=index) for pathway_1, pathway_2 in itt.product(index, index): intersection = len(gene_sets[pathway_1].intersection(gene_sets[pathway_2])) smaller_set = min(len(gene_sets[pathway_1]), len(gene_sets[pathway_2])) similarity = float(intersection / smaller_set) # Formula to calculate similarity similarity_dataframe[pathway_1][pathway_2] = similarity return similarity_dataframe
[docs]def add_node(node, parent): """Create a nested dictionary from the ClusterNode's returned by SciPy. :param node: :param dict parent: """ # First create the new node and append it to its parent's children new_node = dict(node_id=node.id, children=[]) parent["children"].append(new_node) # Recursively add the current node's children if node.left: add_node(node.left, new_node) if node.right: add_node(node.right, new_node)
[docs]def label_tree(id_name_dict, name_manager_dict, cluster_to_x, tree): """Label the tree in a recursive way with names, resource and cluster information. :param dict[str,str] id_name_dict: node_id to name dictionary :param dict[str,str] name_manager_dict: node name to resource ditionary :param dict[tuple[int,int],float] cluster_to_x: node_id tuple of the cluster to distance :param dict tree: tree like structure :rtype: list """ if len(tree["children"]) == 0: leafs_ids = [tree["node_id"]] node_name = id_name_dict[leafs_ids[0]] tree["name"] = node_name tree["color"] = name_manager_dict[node_name] tree["y"] = 0 return [] result = [(tree["node_id"], list(tree["children"]))] childs = [] for child in tree["children"]: # Iterate over the two children childs.append(child["node_id"]) result.extend(label_tree(id_name_dict, name_manager_dict, cluster_to_x, child)) # Recursive tree transversal tree["y"] = cluster_to_x[childs[0], childs[1]] return result
[docs]def get_dendrogram_tree(gene_sets, pathway_manager_dict): """Return ready to plot json data. :param dict[str,set[str]] gene_sets: pathway gene sets dict :param dict[str,str] pathway_manager_dict: pathway name to manager dictionary :rtype: tuple[dict,int] :return: json tree like structure """ similarity_matrix = create_similarity_matrix(gene_sets) # Create the dissimilarity matrix for each row of the similarity matrix using 1-R where R is the pearson correlation # Between two rows distance_matrix = pdist( similarity_matrix, metric='correlation', # **{'centered': False} # TODO: try in Python 3.6 ) # Checks for exceptions (pathways with 1 gene only matching the gene queried causes division by zero problems because the distance of this pathway to all others is 1.0) distance_matrix, pathway_manager_dict, similarity_matrix = _check_error_distance( distance_matrix, pathway_manager_dict, similarity_matrix ) # Calculate clusters clusters = scipy.cluster.hierarchy.linkage(distance_matrix, method='average') # Tree lik object tree = scipy.cluster.hierarchy.to_tree(clusters, rd=False) # Dictionary of tuple of nodes ids (cluster) pointing to the distance in the histogram of that cluster cluster_to_x = { (int(cluster[0]), int(cluster[1])): cluster[2] for cluster in clusters } # Create dictionaries necessary to label the tree object with node and resource info pathways = list(similarity_matrix.columns) id_name_dict = dict(zip(range(len(pathways)), pathways)) # Initialize nested dictionary for d3, then recursively iterate through tree d3_dendrogram = dict(children=[], name="Root") add_node(tree, d3_dendrogram) label_tree(id_name_dict, pathway_manager_dict, cluster_to_x, d3_dendrogram["children"][0]) return d3_dendrogram, len(pathways)
[docs]def get_descendants(manager, resource, pathway_id, pathway_name): """Generate d3 dendrogram structure by using BFS starting from the starting from a parent (root) node to the last descendants. :param manager: ComPath manager :param str resource: resource name :param str pathway_id: pathway identifier in the resource :param str pathway_name: pathway name :return: parent-children data structure :rtype: list[dict] """ # Create the entry dictionary of the pathway (node). d3_dendrogram = dict( children=[], name=pathway_name.replace(' - Homo sapiens (human)', ''), # Replace KEGG Suffixes pathway_id=pathway_id, resource=resource ) # Get direct descendents for the pathway. descendent_mappings = manager.get_decendents_mappings_from_pathway_with_is_part_of_relationship( resource, pathway_id, pathway_name ) # Return the entry dict with no children if the node got no descendants. if not descendent_mappings: return d3_dendrogram # Do the recusive call for each child. for mapping in descendent_mappings: pathway = mapping.get_complement_mapping_info(resource, pathway_id, pathway_name) d3_dendrogram["children"].append( get_descendants( manager, pathway[0], pathway[1], pathway[2] ) ) return d3_dendrogram
[docs]def get_mapping_dendrogram(manager, resource, pathway_id, pathway_name): """Generate d3 dendrogram structure by using BFS starting from the queried node in both directions of the hierarchy. :param manager: ComPath manager :param str resource: resource name :param str pathway_id: pathway identifier in the resource :param str pathway_name: pathway name :return: parent-children data structure :rtype: list[dict] """ ancestries_mappings = [] common_ancestries = [] root = [resource, pathway_id, pathway_name] # Get direct progenitors for the pathway. ancestry_mappings = manager.get_ancestry_mappings_from_pathway_with_is_part_of_relationship( resource, pathway_id, pathway_name ) # Set as root if there is some progenitor (parent) if ancestry_mappings: root = ancestry_mappings[0].get_complement_mapping_info(resource, pathway_id, pathway_name) # If there are many progenitors, get the common ancestry of the progenitors (parents) if len(ancestry_mappings) > 1: for ancestry in ancestry_mappings: pathway = ancestry.get_complement_mapping_info(resource, pathway_id, pathway_name) mapping = manager.get_ancestry_mappings_from_pathway_with_is_part_of_relationship( pathway[0], pathway[1], pathway[2] ) if mapping: ancestries_mappings.append(mapping) if not common_ancestries: common_ancestries = set(mapping) else: common_ancestries.intersection(set(mapping)) # Set as root if there is some ancestry (grandparent) if common_ancestries: root = list(common_ancestries)[0].get_complement_mapping_info(resource, pathway_id, pathway_name) # If there are many ancestries, do a recursive call to get the ancestries (grandparents) if len(common_ancestries) > 1: get_mapping_dendrogram( manager, root[0], root[1], root[2] ) return get_descendants(manager, root[0], root[1], root[2])
[docs]def add_mapping_node(id_name_dict, name_manager_dict, cluster_to_x, tree): """Label the tree in a recursive way with names, resource and cluster information. :param dict[str,str] id_name_dict: node_id to name dictionary :param dict[str,str] name_manager_dict: node name to resource ditionary :param dict[tuple[int,int],float] cluster_to_x: node_id tuple of the cluster to distance :param dict tree: tree like structure :rtype: list """ if len(tree["children"]) == 0: leafs_ids = [tree["node_id"]] node_name = id_name_dict[leafs_ids[0]] tree["name"] = node_name tree["color"] = name_manager_dict[node_name] tree["y"] = 0 return [] result = [(tree["node_id"], list(tree["children"]))] childs = [] for child in tree["children"]: # Iterate over the two children childs.append(child["node_id"]) result.extend(label_tree(id_name_dict, name_manager_dict, cluster_to_x, child)) # Recursive tree transversal tree["y"] = cluster_to_x[childs[0], childs[1]] return result