Source code for cellcomplex.graph.temporal_property_graph

# -*- python -*-
#
#       OpenAlea.Container
#
#       Copyright 2011 - 2013 INRIA - CIRAD - INRA
#
#       File author(s): Jonathan Legrand <jonathan.legrand@ens-lyon.fr>
#                       Christophe Pradal
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       OpenAlea WebSite: http://openalea.gforge.inria.fr
#
################################################################################
"""This module provide a class that extends the PropertyGraph with different types of edges"""

__license__ = "Cecill-C"
__revision__ = " $Id$ "

from .property_graph import Graph, PropertyGraph

[docs]class TemporalPropertyGraph(PropertyGraph): """ Simple implementation of PropertyGraph using dict as properties and two dictionaries to maintain these properties """ STRUCTURAL = 's' TEMPORAL = 't' def __init__(self, graph=None, **kwds): PropertyGraph.__init__(self, graph, idgenerator='max',**kwds) self.add_edge_property('edge_type') # list of dict # each dict define the mapping between the new and the old vid. # old label define both graph index and local id. self.add_edge_property('old_label') self.add_vertex_property('old_label') self.add_vertex_property('index') self.nb_time_points = 0 # list of (N_tp) tuples of two dict vertex & edges: # (self._old_to_new_ids[tp][0] = old_to_new_vids; self._old_to_new_ids[tp][1] = old_to_new_eids). self._old_to_new_ids = []
[docs] def extend(self, graphs, mappings, time_steps = None, disable_lineage_checking=True): """ Extend the structure with graphs and mappings. Each graph contains structural edges. Mapping define dynamic edges between two graphs. :Parameters: - graphs - a list of PropertyGraph - mappings - a list defining the dynamic or temporal edges between two graphs. - time_steps - time corresponding to each graph :warning:: len(graphs) == len(mappings)-1 """ # - Usual paranoia (avoid useless computation): assert len(graphs) == len(mappings)+1 if time_steps is not None: assert len(graphs) == len(time_steps) # - First append a spatial graph (PropertyGraph): self.append(graphs[0]) self.nb_time_points += 1 # - Now loop over PropertyGraph & mapping (temporal relation between graphs): for g, m in zip(graphs[1:],mappings): self.append(g,m, disable_lineage_checking) self.nb_time_points += 1 # - Finally save the first property: 'time_steps' if time_steps is not None: self.add_graph_property('time_steps',time_steps) return self._old_to_new_ids
[docs] def append(self, graph, mapping=None, disable_lineage_checking=False): """ Append a (spatial) graph to the tpg structure with respect to a given temporal mapping. """ # Usual paranoia: check the existence of a first graph when trying to link it to a second one! if mapping: assert len(self._old_to_new_ids) >= 1, 'To create temporal edges between two graphs, first add a graph without mapping.' # Get our current index (position) in time using the length of the 'old_to_new_ids' dictionary: current_index = len(self._old_to_new_ids) # Get useful values in shorter variable names: edge_types = self.edge_property('edge_type') old_edge_labels = self.edge_property('old_label') old_vertex_labels = self.vertex_property('old_label') indices = self.vertex_property('index') # Add and translate the vertex and edge ids of the second graph: relabel_ids = Graph.extend(self,graph) old_to_new_vids, old_to_new_eids = relabel_ids self._old_to_new_ids.append(relabel_ids) # was put later, here should be good too! # Relabel the edge and vertex property: self._relabel_and_add_vertex_edge_properties(graph, old_to_new_vids, old_to_new_eids) # Update properties on graph temporalgproperties = self.graph_properties() # while on a property graph, graph_property are just dict of dict, # on a temporal property graph, graph_property are dict of list of dict # to keep the different values for each time point. for gname in graph.graph_property_names(): if gname in [self.metavidtypepropertyname,self.metavidtypepropertyname]: temporalgproperties[gname] = graph.graph_property(gname) else: newgproperty = graph.translate_graph_property(gname, old_to_new_vids, old_to_new_eids) temporalgproperties[gname] = temporalgproperties.get(gname,[])+[newgproperty] #~ self._old_to_new_ids.append(relabel_ids) # set edge_type property for structural edges for old_eid, eid in old_to_new_eids.items(): edge_types[eid] = self.STRUCTURAL old_edge_labels[eid] = old_eid for old_vid, vid in old_to_new_vids.items(): old_vertex_labels[vid] = old_vid indices[vid] = current_index def use_sub_lineage(mother, daughters, on_ids_source, on_ids_target): found_sub_lineage=False; tmp_daughters = [] for d in daughters: if iterable(d): found_sub_lineage=True; tmp_d = [] if "sub_lineage" not in self.graph_properties(): self.add_graph_property("sub_lineage") for sub_d in d: if iterable(sub_d): use_sub_lineage(mother, sub_d, on_ids_source, on_ids_target) else: tmp_d.append(on_ids_target[0][sub_d]) tmp_daughters.append(tmp_d) else: tmp_daughters.append(on_ids_target[0][d]) if found_sub_lineage: self.graph_property("sub_lineage").update({on_ids_source[0][mother]:tmp_daughters}) def flatten(l): """ Flatten everything that's a list! :Examples: >>> list(flatten([1,2,3,4])) >>> [1, 2, 3, 4] >>> list(flatten([[1,2],[3,4]])) >>> [1, 2, 3, 4] >>> list(flatten([[1,[2,3]],4])) >>> [1, 2, 3, 4] """ import collections for el in l: if isinstance(el, collections.Iterable) and not isinstance(el, str): for sub in flatten(el): yield sub else: yield el if mapping: unused_lineage, no_ancestor, no_all_desc = {}, {}, {} on_ids_source, on_ids_target = self._old_to_new_ids[-2:] #get the last two image2graph dict (label2vertex @ t_n-1 & t_n) for k, l in mapping.items(): l_flat = list(flatten(l)) # flatten the lineage (i.e. [[1,2],3] -> [1,2,3]) if disable_lineage_checking: for v in l_flat: try: eid = self.add_edge(on_ids_source[0][k], on_ids_target[0][v]) edge_types[eid] = self.TEMPORAL except: unused_lineage.update({k:v}) # Check if the mother cell and ALL daugthers are present in their respective topological graph : WE DON'T WANT TO CREATE A PARTIAL LINEAGE !!!! elif k in on_ids_source[0] and ( sum([v in on_ids_target[0].keys() for v in l_flat]) == len(l_flat) ): use_sub_lineage(k, l, on_ids_source, on_ids_target) for v in l_flat: eid = self.add_edge(on_ids_source[0][k], on_ids_target[0][v]) edge_types[eid] = self.TEMPORAL else: unused_lineage.update({k:l}) if k not in on_ids_source[0]: no_ancestor.update({k:l}) if not ( sum([v in on_ids_target[0].keys() for v in l_flat]) == len(l_flat) ): no_all_desc.update({k:l}) if unused_lineage != {}: print("Detected partial lineage info from t{} to t{} !!".format(current_index-1,current_index)) print(" - {} lineage infos could not be used...".format(len(unused_lineage))) print(" - this represent {} ({}/{}) of the initially provided mapping...".format(round(float(len(unused_lineage))/len(mapping),3)*100,len(unused_lineage),len(mapping))) #~ print "It is most likely that you are trying to add lineage between non-existant vertex in your spatial graphs!" #~ print "Check if you are not using an out-dated graph and erase temporary files (TPG creation...)." #~ print "Or maybe the lignage is detected as 'incomplete' because some cells have been removed before topological-graph computation." if no_ancestor != {}: print(" - {} have missing ancestors in their topological graph (at time {}).".format(len(no_ancestor), current_index-1)) if no_all_desc != {}: print(" - {} have missing descendants in their topological graph (at time {}).".format(len(no_all_desc), current_index)) return relabel_ids
[docs] def clear(self): PropertyGraph.clear(self) self._old_to_new_ids = []
def __to_set(self, s): if not isinstance(s, set): if isinstance(s, list): s=set(s) else: s=set([s]) return s
[docs] def vertex_temporal_index(self, vid): """ Return the temporal index of a vid `self.vertex_property('index')[vid]`.""" if isinstance(vid, list): return [self.vertex_temporal_index(v) for v in vid] else: return self.vertex_property('index')[vid]
[docs] def children(self, vid): """ Return children of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `children_list` : the set of the children of the vertex vid """ return self.out_neighbors(vid, 't')
[docs] def iter_children(self, vid): """ Return children of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `iterator` : an iterator on the set of the children of the vertex vid """ return iter(self.children(vid))
[docs] def has_children(self, vid): """ Return True if the vid `vid` has a child or children. """ if self.children(vid) != set(): return True else: return False
[docs] def parent(self, vid): """ Return parents of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `parents_list` : the set of the parents of the vertex vid """ return self.in_neighbors(vid, 't')
[docs] def iter_parent(self, vid): """ Return parents of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `iterator` : the set of the children of the vertex vid """ return iter(self.parent(vid))
[docs] def has_parent(self, vid): """ Return True if the vid `vid` has a parent. """ if self.parent(vid) != set(): return True else: return False
[docs] def sibling(self, vid): """ Return sibling of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `sibling_list` : the set of sibling of the vertex vid """ if self.parent(vid): return self.children(self.parent(vid).pop())-set([vid]) else: return None
[docs] def iter_sibling(self, vid): """ Return of the vertex vid :Parameters: - `vid` : a vertex id :Returns -------- - `iterator` : an iterator on the set of sibling of the vertex vid """ return iter(self.sibling(vid))
[docs] def descendants(self, vids, n = None): """ Return the 0, 1, ..., nth descendants of the vertex vid :Parameters: - `vids` : a set of vertex id :Returns -------- - `descendant_list` : the set of the 0, 1, ..., nth descendant of the vertex vid """ edge_type='t' neighbs=set() vids=self.__to_set(vids) if n==0 : return vids elif n==1 : for vid in vids: neighbs |= (self.out_neighbors(vid, edge_type) | set([vid])) return neighbs else : if n is None : n = self.nb_time_points-1 for vid in vids : neighbs |= (self.descendants(self.out_neighbors(vid, edge_type), n-1) | set([vid])) if list(neighbs)==self._vertices.keys(): return neighbs return neighbs
[docs] def iter_descendants(self, vids, n = None): """ Return the 0, 1, ..., nth descendants of the vertex vid :Parameters: - `vids` : a set of vertex id :Returns -------- - `iterator` : an iterator on the set of the 0, 1, ..., nth descendants of the vertex vid """ return iter(self.descendants(vids, n))
def rank_descendants(self, vid, rank=1): """ Return the descendants of the vertex vid only at a given rank :Parameters: - `vid` : a vertex id or a set of vertex id :Returns -------- - `descendant_list` : the set of the rank-descendant of the vertex vid or a list of set """ if isinstance(vid,list): return [self.rank_descendants(v,rank) for v in vid] else: return self.descendants(vid,rank)- self.descendants(vid,rank-1) def has_descendants(self, vid, rank=1): """ Return True if the vid `vid` has at least a descendant at `rank`. """ return self.rank_descendants(vid,rank) != set()
[docs] def rank_descendants(self, vid, rank=1): """ Return the descendants of the vertex vid only at a given rank :Parameters: - `vid` : a vertex id or a set of vertex id :Returns -------- - `descendant_list` : the set of the rank-descendant of the vertex vid or a list of set """ if isinstance(vid,list): return [self.rank_descendants(v,rank) for v in vid] else: return self.descendants(vid,rank)- self.descendants(vid,rank-1)
[docs] def has_descendants(self, vid, rank=1): """ Return True if the vid `vid` has at least a descendant at `rank`. """ return self.rank_descendants(vid,rank) != set()
[docs] def ancestors(self, vids, n = None): """Return the 0, 1, ..., nth ancestors of the vertex vid :Parameters: - `vids` : a set of vertex id :Returns -------- - `anestors_list` : the set of the 0, 1, ..., nth ancestors of the vertex vid """ edge_type='t' neighbs=set() vids=self.__to_set(vids) if n==0 : return vids elif n==1 : for vid in vids: neighbs |= (self.in_neighbors(vid, edge_type) | set([vid])) return neighbs else : if n is None: n = self.nb_time_points-1 for vid in vids : neighbs |= (self.ancestors(self.in_neighbors(vid, edge_type), n-1) | set([vid])) if list(neighbs)==self._vertices.keys(): return neighbs return neighbs
[docs] def iter_ancestors(self, vids, n): """ Return the 0, 1, ..., nth ancestors of the vertex vid :Parameters: - `vids` : a set of vertex id :Returns -------- - `iterator` : an iterator on the set of the 0, 1, ..., nth ancestors of the vertex vid """ return iter(self.ancestors(vids, n))
[docs] def rank_ancestors(self, vid, rank=1): """ Return the ancestor of the vertex vid only at a given rank :Parameters: - `vid` : a vertex id or a set of vertex id :Returns -------- - `descendant_list` : the set of the rank-ancestor of the vertex vid or a list of set """ if isinstance(vid,list): return [self.rank_ancestors(v,rank) for v in vid] else: return self.ancestors(vid,rank)- self.ancestors(vid,rank-1)
[docs] def has_ancestors(self, vid, rank=1): """ Return True if the vid `vid` has at least an ancestor at `rank`. """ return self.rank_ancestors(vid,rank) != set()
def _lineaged_as_ancestor(self, time_point=None, rank=1): """ Return a list of vertex lineaged as ancestors.""" if time_point is None: return [k for k in self.vertices() if self.has_descendants(k,rank)] else: return [k for k,v in self.vertex_property('index').items() if v==time_point and self.has_descendants(k,rank)] def _lineaged_as_descendant(self, time_point=None, rank=1): """ Return a list of vertex lineaged as descendants.""" if time_point is None: return [k for k in self.vertices() if self.has_ancestors(k,rank)] else: return [k for k,v in self.vertex_property('index').items() if v==time_point and self.has_ancestors(k,rank)] def _fully_lineaged_vertex(self, time_point=None): """ Return a list of fully lineaged vertex (from a given `time_point` if not None), i.e. lineaged from start to end. """ from vplants.tissue_analysis.temporal_graph_analysis import exist_all_relative_at_rank rank = self.nb_time_points-1 flv = self.descendants([k for k in self.vertex_at_time(0) if exist_all_relative_at_rank(self, k, rank)], rank) if time_point is None: return flv else: return [vid for vid in flv if self.vertex_temporal_index(vid)==time_point]
[docs] def lineaged_vertex(self, fully_lineaged=False, as_ancestor=False, as_descendant=False, lineage_rank=1): """ Return ids of lineaged vertices, with differents type of lineage possible: - a full lineage, i.e. only vids with a lineage from the first to the last time-point (fully_lineaged=True); - a lineage over several ranks, i.e. only vids with a lineage from the vid to the vid+lineage_rank time-point (fully_lineaged=False, lineage_rank=int); - an 'ancestors' based lineage (as_ancestor = True), i.e. only vids lineaged as ancestor (over lineage_rank if not None); - an 'descendants' based lineage (as_ancestor = True), i.e. only vids lineaged as descendants (over lineage_rank if not None). :Parameter: - `fully_lineaged` (bool) : if True (and lineage_rank is None), return vertices lineaged from the first to the last time-point (or from vid_time_point to vid_time_point + lineage_rank), otherwise return vertices having at least a parent or a child(ren); - `as_parent` (bool) : if True, return vertices lineaged as parents; - `as_children` (bool) : if True, return vertices lineaged as children; - 'lineage_rank' : int usefull if you want to check the lineage for a different rank than the rank-1 temporal neighborhood. """ from vplants.tissue_analysis.temporal_graph_analysis import exist_all_relative_at_rank if as_ancestor: vids_anc = self._lineaged_as_ancestor(time_point=None, rank=lineage_rank) else: vids_anc = self.vertices() if as_descendant: vids_desc = self._lineaged_as_descendant(time_point=None, rank=lineage_rank) else: vids_desc = self.vertices() if fully_lineaged: vids = self._fully_lineaged_vertex(time_point=None) else: vids = [k for k in self.vertices() if (exist_all_relative_at_rank(self, k, lineage_rank) or exist_all_relative_at_rank(self, k, -lineage_rank))] return list( set(vids) & set(vids_anc) & set(vids_desc) )
def _all_vertex_at_time(self, time_point): """ Return a list containing all vertex assigned to a given `time_point`.""" return [k for k,v in self.vertex_property('index').items() if v==time_point]
[docs] def vertex_at_time(self, time_point, lineaged=False, fully_lineaged=False, as_ancestor=False, as_descendant=False, lineage_rank=1): """ Return vertices ids corresponding to a given time point in the TPG. Optional parameters can be used to filter the list of vertices ids returned. :Parameters: - `time_point` (int) : integer giving which time point should be considered; - `lineaged` (bool) : if True, return vertices having at least a parent or a child(ren); - 'lineage_rank' : int usefull if you want to check the lineage for a different rank than the rank-1 temporal neighborhood; - `fully_lineaged` (bool) : if True, return vertices linked from the beginning to the end of the graph; - `as_parent` (bool) : if True, return vertices lineaged as parents; - `as_children` (bool) : if True, return vertices lineaged as children. """ if lineaged or fully_lineaged: vids = self.lineaged_vertex(fully_lineaged, as_ancestor, as_descendant, lineage_rank) return list(set(vids) & set(self._all_vertex_at_time(time_point))) else: return self._all_vertex_at_time(time_point)
[docs] def vertex_property_at_time(self, vertex_property, time_point, lineaged=False, fully_lineaged=False, as_ancestor=False, as_descendant=False, lineage_rank=1): """ Return the `vertex_property``for a given `time_point`. May be conditionned by extra temporal property: `lineaged`, `fully_lineaged`, `as_parent`, `as_children`. :Parameters: - `vertex_property` : str a string refering to an existing 'graph.vertex_property' to extract; - `time_point` (int) : integer giving which time point should be considered; - `lineaged` (bool) : if True, return vertices having at least a parent or a child(ren); - 'lineage_rank' : int usefull if you want to check the lineage for a different rank than the rank-1 temporal neighborhood; - `fully_lineaged` (bool) : if True, return vertices linked from the beginning to the end of the graph; - `as_parent` (bool) : if True, return vertices lineaged as parents; - `as_children` (bool) : if True, return vertices lineaged as children. """ vids = self.vertex_at_time(time_point, lineaged, fully_lineaged, as_ancestor, as_descendant) return dict([(k,self.vertex_property(vertex_property)[k]) for k in vids if k in self.vertex_property(vertex_property)])
[docs] def vertex_property_with_image_labels(self, vertex_property, time_point, lineaged=False, fully_lineaged=False, as_ancestor=False, as_descendant=False, lineage_rank=1): """ Return a subpart of graph.vertex_property(`vertex_property`) with relabelled keys into "images labels" thanks to the dictionary graph.vertex_property('old_labels'). Since "images labels" can be similar (not unique), it is mandatory to give a `time_point`. Additional parameters can be given and are related to the 'self.vertex_property_at_time' parameters. :Parameters: - `vertex_property` : str a string refering to an existing 'graph.vertex_property' to extract; - `time_point` : int time-point for which to return the `vertex_property`; - `lineaged` (bool) : if True, return vertices having at least a parent or a child(ren); - 'lineage_rank' : int usefull if you want to check the lineage for a different rank than the rank-1 temporal neighborhood; - `fully_lineaged` (bool) : if True, return vertices linked from the beginning to the end of the graph; - `as_parent` (bool) : if True, return vertices lineaged as parents; - `as_children` (bool) : if True, return vertices lineaged as children. :Returns -------- - *key_ vertex/cell label, *values_ `vertex_property` :Examples: graph.vertex_property_with_image_labels('volume', 0) """ from vplants.tissue_analysis.temporal_graph_analysis import translate_keys_Graph2Image return translate_keys_Graph2Image(self, self.vertex_property_at_time(vertex_property, time_point, lineaged, fully_lineaged, as_ancestor, as_descendant))
[docs] def edge_property_at_time(self, edge_property, time_point): """ Return a subpart of graph.edge_property(`edge_property`) with relabelled key-pair into "images labelpairs" thanks to the dictionary graph.vertex_property('old_labels'). :Parameters: - `vertex_property` : str a string refering to an existing 'graph.edge_property' to extract; - `time_point` : int time-point for which to return the `edge_property`; :Examples: graph.edge_property_with_image_labelpairs('wall_area', 0) """ from vplants.tissue_analysis.temporal_graph_from_image import edge2vertexpair_map eid2vidpair = edge2vertexpair_map(self, time_point) #~ return dict([(tuple(sorted([label1,label2])),self.edge_property(edge_property)[eid]) for eid,(label1,label2) in eid2vidpair.items() if self.edge_property(edge_property).has_key(eid)]) tmp_dict = {} for eid,(label1,label2) in eid2vidpair.items(): if eid in self.edge_property(edge_property): tmp_dict[tuple(sorted([label1,label2]))] = self.edge_property(edge_property)[eid] return tmp_dict
[docs] def edge_property_with_image_labelpairs(self, edge_property, time_point): """ Return a subpart of graph.edge_property(`edge_property`) with relabelled key-pair into "images labelpairs" thanks to the dictionary graph.vertex_property('old_labels'). :Parameters: - `vertex_property` : str a string refering to an existing 'graph.edge_property' to extract; - `time_point` : int time-point for which to return the `edge_property`; :Examples: graph.edge_property_with_image_labelpairs('wall_area', 0) """ from vplants.tissue_analysis.temporal_graph_from_image import edge2labelpair_map eid2labelpair = edge2labelpair_map(self, time_point) return dict([(tuple(sorted([label1,label2])),self.edge_property(edge_property)[eid]) for eid,(label1,label2) in eid2labelpair.items() if eid in self.edge_property(edge_property)])
[docs] def region_vids(self, region_name): """ Return a list of vids (TPG vertex id type) that belong to the region `region_name` according to graph. :Parameters: - `region_name` (str) : the name of a region (e.g. from `self.add_vertex_to_region()`) """ if 'regions' in list(self.vertex_properties()): return sorted(list(set([k for k,v in self.vertex_property('regions').items() for r in v if r==region_name]))) else: print("No property 'regions' added to the graph yet!") return None
[docs]def iterable(obj): try : iter(obj) return True except TypeError as te: return False