Receptor Tools - Cycle Fluxes and Derived Markov chain

Receptor Tools - Cycle Fluxes and Derived Markov chain#

def extract_first_cycle(sequence,cycle_shift=True):
    seen = {}
    start = None
    end = None

    for i, item in enumerate(sequence):
        if item in seen:
            if start is None:
                start = seen[item]
                end = i
            else:
                break
        else:
            seen[item] = i

    if start is not None and end is not None:
        if cycle_shift:
            return cyclic_shift(sequence[start:end+1]), sequence[0:start]+sequence[end:]
        else:
            return sequence[start:end+1], sequence[0:start]+sequence[end:]
    else:
        return None, sequence[end:]
def derived_graph(G,r=None,edge_labels='both'):
    if r==None:
        r=G.vertices(sort=True)[0]
    def doit(DerG):
        sink_list = DerG.sinks()
        if len(sink_list)==0:
            return DerG
        else:
            for w in sink_list:
                for u in G.neighbor_out_iterator(w[-1]):
                    cyc, seq = extract_first_cycle(w+(u,),cycle_shift=False)
                    if cyc==None:
                        cyc=()
                    if edge_labels=='cyc':
                        DerG.add_edge(w,seq,cyc)
                    elif edge_labels=='rate':
                        DerG.add_edge(w,seq,G.edge_label(w[-1],seq[-1]))
                    elif edge_labels=='both':
                        DerG.add_edge(w,seq,(G.edge_label(w[-1],seq[-1]),cyc))
        return doit(DerG)
    DerG = DiGraph()
    DerG.add_vertex((r,))
    return doit(DerG)
def cartesian_edge_color(GH,verbose=True):
    # Define edge colors
    R = rainbow(2)
    edge_colors = {}
    for i in range(2):
        edge_colors[R[i]] = []

    # Apply edge colors
    for edge in GH.edges():
        u, v, label = edge
        if u[0] == v[0]:
            edge_colors[R[0]].append(edge)
        if u[1] == v[1]:
            edge_colors[R[1]].append(edge)

    # Plot the Cartesian product with edge colors
    if verbose:
        GH.show(edge_colors=edge_colors)
        #GH.graphplot(vertex_labels=False, vertex_size=0,edge_colors=edge_colors).show()
    return edge_colors
def double_derived_graph_with_context(DerG,DerH,verbose=False):
    Vg = edge_ring_from_derived_graph(DerG)
    Vh = edge_ring_from_derived_graph(DerH)
    groot = DerG.vertices(sort=True)[0][0] # first element of tuple vertex = (0,) ==> groot = 0
    hroot = DerH.vertices(sort=True)[0][0]
    gh_vars = [];
    for g_var in Vg.gens():
        gh_vars += [str(g_var)+'%s'%(i+hroot) for i in range(H.order()) ]
    for h_var in Vh.gens():
        gh_vars += [str(h_var)+'%s'%(i+groot) for i in range(G.order()) ]
    gh_vars = list(set(gh_vars))
    Vgh = PolynomialRing(ZZ, names=gh_vars)
    DDerGH = DerG.cartesian_product(DerH)
    for edge in DDerGH.edges():
        DDerGH.set_edge_label(edge[0],edge[1],None)
        if edge[0][0]==edge[1][0]:
            hfr = edge[0][1]
            hto = edge[1][1]
            g = edge[0][0] # same as edge[1][0]
            if verbose:
                print('h changed:',edge,':',hfr,'->',hto,' in g context ', g[-1])
            gh_var = Vgh( str(DerH.edge_label(hfr,hto)[0] ) + str( g[-1] ))
            new_label = (gh_var,DerH.edge_label(hfr,hto)[1])
            DDerGH.set_edge_label(edge[0],edge[1],new_label)
        if edge[0][1]==edge[1][1]:
            gfr = edge[0][0]
            gto = edge[1][0]
            h = edge[0][1] # same as edge[1][1]
            if verbose:
                print('g changed:',edge,':',gfr,'->',gto,' in h context ', h[-1])
            gh_var = Vgh( str(DerH.edge_label(gfr,gto)[0] ) + str( h[-1] ))
            new_label = ( gh_var, DerG.edge_label(gfr,gto)[1])
            DDerGH.set_edge_label(edge[0],edge[1],new_label)
    return DDerGH
def double_derived_graph_without_context(DerG,DerH,verbose=False):
    DDerGH = DerG.cartesian_product(DerH)
    Vg = edge_ring_from_derived_graph(DerG)
    Vh = edge_ring_from_derived_graph(DerH)
    Vgh = merge_edge_rings(Vg,Vh)
    for edge in DDerGH.edges():
        DDerGH.set_edge_label(edge[0],edge[1],None)
        if edge[0][0]==edge[1][0]:
            hfr = edge[0][1]
            hto = edge[1][1]
            if verbose:
                print('h changed:',edge,':',hfr,'->',hto)
            new_label = (Vgh(DerH.edge_label(hfr,hto)[0]),DerH.edge_label(hfr,hto)[1])
            DDerGH.set_edge_label(edge[0],edge[1],new_label)
        if edge[0][1]==edge[1][1]:
            gfr = edge[0][0]
            gto = edge[1][0]
            if verbose:
                print('g changed:',edge,':',gfr,'->',gto)
            new_label = ( Vgh(DerG.edge_label(gfr,gto)[0]), DerG.edge_label(gfr,gto)[1])
            DDerGH.set_edge_label(edge[0],edge[1],new_label)
    return DDerGH
def double_derived_graph(DerG,DerH,context=False,reduce=False,verbose=False):
    if context:
        DDerGH = double_derived_graph_with_context(DerG,DerH,verbose=verbose)
    else:
        DDerGH = double_derived_graph_without_context(DerG,DerH,verbose=verbose)
    if reduce:
        if DerG == DerH:
            DDerGH = symmetrize(DDerGH,rates=True)
        else:
            print('Warning: Cannot reduce when DerG not equal to DerH')
    return DDerGH
def merge_edge_rings(Vg,Vh):
    gh_vars = [ str(g_var) for g_var in Vg.gens() ] + [ str(h_var) for h_var in Vh.gens() ]
    gh_vars = list(set(gh_vars))
    return PolynomialRing(ZZ, names=gh_vars)
def Cartesian_product_without_context(G, H):
    GH = G.cartesian_product(H)
    Vg = edge_ring_from_graph(G)
    Vh = edge_ring_from_graph(H)
    Vgh = merge_edge_rings(Vg,Vh)
    for v in GH.vertices(sort=True):
        for u in GH.vertices(sort=True):
            if v!=u:
                if v[0]==u[0] and H.has_edge(v[1],u[1]):
                    GH.set_edge_label(v,u,Vgh(H.edge_label(v[1],u[1])))
                if v[0]==u[0] and H.has_edge(u[1],v[1]):
                    GH.set_edge_label(u,v,Vgh(H.edge_label(u[1],v[1])))
                if v[1]==u[1] and G.has_edge(v[0],u[0]):
                    GH.set_edge_label(v,u,Vgh(G.edge_label(v[0],u[0])))
                if v[1]==u[1] and G.has_edge(u[0],v[0]):
                    GH.set_edge_label(u,v,Vgh(G.edge_label(u[0],v[0])))
    return GH
def Cartesian_product_with_context(G, H):
    Vg = edge_ring_from_graph(G)
    Vh = edge_ring_from_graph(H)
    groot = G.vertices(sort=True)[0]
    hroot = H.vertices(sort=True)[0]
    gh_vars = [];
    for g_var in Vg.gens():
        gh_vars += [str(g_var)+'%s'%(i+hroot) for i in range(H.order()) ]
    for h_var in Vh.gens():
        gh_vars += [str(h_var)+'%s'%(i+groot) for i in range(G.order()) ]
    gh_vars = list(set(gh_vars))
    Vgh = PolynomialRing(ZZ, names=gh_vars)
    GH = G.cartesian_product(H)
    for v in GH.vertices(sort=True):
        for u in GH.vertices(sort=True):
            if v!=u:
                if v[0]==u[0] and H.has_edge(v[1],u[1]):
                    gh_var = Vgh(str(H.edge_label(v[1],u[1])) + str(v[0]))
                    GH.set_edge_label(v,u,gh_var)
                if v[0]==u[0] and H.has_edge(u[1],v[1]):
                    gh_var = Vgh(str(H.edge_label(u[1],v[1])) + str(v[0]))
                    GH.set_edge_label(u,v,gh_var)
                if v[1]==u[1] and G.has_edge(v[0],u[0]):
                    gh_var = Vgh(str(G.edge_label(v[0],u[0])) + str(v[1]))
                    GH.set_edge_label(v,u,gh_var)
                if v[1]==u[1] and G.has_edge(u[0],v[0]):
                    gh_var = Vgh(str(G.edge_label(u[0],v[0])) + str(v[1]))
                    GH.set_edge_label(u,v,gh_var)
    return GH
def Cartesian_product(G, H, context=False):
    if context:
        GH = Cartesian_product_with_context(G, H)
    else:
        GH = Cartesian_product_without_context(G, H)
    return GH
def edge_ring_from_graph(G):
    Vg = G.random_edge()[2].parent()
    return Vg
def edge_ring_from_derived_graph(DerG):
    Vg = DerG.random_edge()[2][0].parent()
    return Vg
def weighted_laplacian_matrix(G):
    verts = G.vertices(sort=True)
    A = G.weighted_adjacency_matrix(verts)
    Q = A - diagonal_matrix(sum(A.T))
    return Q
def my_trees(Q,vdel=None,vdict=None,verbose=False,big=False): # Note: vdel needs to be a list, such as [0] or [0,3,4], or None 
    if vdel!=None:
        if isinstance(vdel, tuple):
            vdel = list(vdel)
        if isinstance(vdel, list) == False:
            raise ValueError("vdel must be a list of vertices or integers")
        if vdict==None:
            vd = vdel
        else:
            vd = [ vdict[key] for key in vdel ]
        if big==False:
            Q0 = Q.delete_rows(vd).delete_columns(vd)
            t = det(Q0)*(-1)^Q0.nrows()
        else: 
            G = DiGraph(Q)
            G.merge_vertices(vd)
            r = vd[0] 
            # Note: The next line is r=vd[0] because merge_vertices results
            # in the new vertex is named after the first vertex in the merge list
            # https://doc.sagemath.org/html/en/reference/graphs
            # /sage/graphs/generic_graph.html#sage.graphs.generic_graph.GenericGraph.merge_vertices
            G.remove_loops()
            VV = Q[0,0].parent() # the ring of edge labels 
            t = VV(0)
            for k,in_branching in enumerate(G.in_branchings(r)):
                edge_str = ''
                edge_prod = VV(1)
                for e in in_branching.edges():
                    edge_str += str(e[2])
                    edge_prod *= e[2]
                t += edge_prod
                if verbose:
                    in_branching.show(figsize=4,graph_border=True,\
                                      edge_labels=True,title=str(k)+': '+edge_str)
        if verbose:
            print('For root/cycle',vdel,':',t.factor(),'\n')
        return t
    else:
        T = []
        if vdict==None:
            for i in range(Q.ncols()):
                t = my_trees(Q,vdel=[i],verbose=verbose,big=big)
                T.append(t)
        else:
            for key in vdict.keys():
                t = my_trees(Q,vdel=[key],vdict=vdict,verbose=verbose,big=big)
                T.append(t)
        return T
def cycle_flux_analysis(G,verbose=True,str=''):
    if verbose:
        print()
        G.show(title=str,edge_labels=True,graph_border=True,figsize=5)
    nvG = G.order(); neG = G.size(); betaG = neG-nvG+1;
    vG = [ v for v in G.vertices(sort=True) ]
    dict_vG = dict(zip(vG,range(nvG))); 
    inverted_dict_vG = {value: key for key, value in dict_vG.items()}
    Q_G = weighted_laplacian_matrix(G)
    if verbose:
        print()
        print(Q_G)
        print()
    Tg = my_trees(Q_G,verbose=verbose)
    TG = sum(Tg)
    cycleG_p, cycleG_m, cycleG_2 = simple_cycles(G,minsize=2)
    cycleG = cycleG_p + cycleG_m + cycleG_2
    if verbose:
        print(str,'cycles p:',cycleG_p,'\n')
        print(str,'cycles m:',cycleG_m,'\n')
        print(str,'cycles 2:',cycleG_2,'\n')
        print()
    d_cycleG = { k:v for k,v in enumerate(cycleG) }
    inverted_d_cycleG = {value: key for key, value in d_cycleG.items()}
    Jg = []
    for i,cyc in enumerate(cycleG):
        Pi = make_pi_one_way(G,cyc)
        Jg.append(Pi*my_trees(Q_G,cyc,vdict=dict_vG,verbose=False))
        if verbose:
            print(str,'cycle flux',i,':',Jg[i].factor())
            print()
    return nvG, neG, betaG, Tg, TG, cycleG_p, cycleG_m, cycleG_2, cycleG, Jg, dict_vG, inverted_dict_vG, d_cycleG, inverted_d_cycleG
def simple_cycles(D,minsize=2,maxsize=1000):
    # Note: this uses cyclic_shift() function
    cycles_p0=[]
    for c in D.all_simple_cycles():
        if len(c)>max(minsize,3) and len(c)<=maxsize and list(reversed(c)) not in cycles_p0:
            cycles_p0.append(c)
    cycles_p=[]
    for c in cycles_p0:
        cycles_p.append(cyclic_shift(c))
    #
    cycles_m=[]
    for c in cycles_p:
        cycles_m.append(list(reversed(c)))
        # next line: this makes cp<cm for each pair
        # (cycle called positive is lexicographically smaller)
    for i in range(len(cycles_p)):
        cp = cycles_p[i]
        cm = cycles_m[i]
        if cp>cm: # want cp<cm
            cycles_p[i]=cm
            cycles_m[i]=cp
    cycles_p = [ tuple(c) for c in cycles_p]
    cycles_m = [ tuple(c) for c in cycles_m]

    # two-cycles
    if minsize==2:
        cycles_2_0=[]
        for c in D.all_simple_cycles():
            if len(c)==3 and list(reversed(c)) not in cycles_p0:
                cycles_2_0.append(c)
        cycles_2=[]
        for c in cycles_2_0:
            cycles_2.append(cyclic_shift(c))
        cycles_2 = [ tuple(c) for c in cycles_2]
        return cycles_p, cycles_m, cycles_2
    else:
        return cycles_p, cycles_m
def cyclic_shift(c,shift='default'): # 'default' shifts so that first element is minimal, presumes a simple cycle
    if c[-1]==c[0]:
        if shift=='default':
            c = c+c[1:]
            i = c.index(min(c))
            j = c.index(min(c),i+1) # i+1 is the starting index from which to search for min(c)
            return c[i:j+1]
        else:
            n=len(c)
            c = c+c[1:]
            return c[shift:shift+n]
    else:
        raise ValueError(str(c)+" is not a cycle.")
def make_pi_one_way(D,cyc):
    p = 1;
    for i in range(len(cyc)-1):
        p=p*D.edge_label(cyc[i],cyc[i+1])
    return p
def split_derived_graph(DerG):
    DerGrate = copy(DerG)
    DerGcyc = copy(DerG)
    for e in DerGrate.edges(sort=True):
        DerGrate.set_edge_label(e[0],e[1], DerG.edge_label(e[0],e[1])[0])
    for e in DerGcyc.edges(sort=True):
        DerGcyc.set_edge_label(e[0],e[1],DerG.edge_label(e[0],e[1])[1])
    return DerGrate, DerGcyc
def cycle_fluxes_from_derived_graph(DerG,verbose=False,big=False,factor=True):
    DerGrate, DerGcyc = split_derived_graph(DerG)
    vDerG = [ v for v in DerGcyc.vertices(sort=True) ]
    dict_vDerG = dict(zip(vDerG,range(len(vDerG))))
    Q_DerG = weighted_laplacian_matrix(DerGrate)
    T = my_trees(Q_DerG,vdel=None,vdict=None,verbose=False,big=big)
    if verbose:
        print('rooted spanning tree of derived graph T(0,0) =',sum(T).factor(),'\n')
    Jg = []
    dict_Jg_lumped = {}
    for e in DerGcyc.edges(sort=True):
        if DerGcyc.edge_label(e[0],e[1]):
            X = T[dict_vDerG[e[0]]]*DerGrate.edge_label(e[0],e[1]) # Note: do not factor X (if you do, you can't +=X below)
            if factor:
                Jg.append(X.factor())
            else:
                Jg.append(X)
            if verbose:
                print('J',e[2],'=', Jg[-1],'\n')
            cyc = cyclic_shift(DerGcyc.edge_label(e[0],e[1]))
            if cyc in dict_Jg_lumped:
                dict_Jg_lumped[cyc] += X
            else:
                dict_Jg_lumped[cyc] = X
    if factor:
        for key in dict_Jg_lumped:
            dict_Jg_lumped[key] = dict_Jg_lumped[key].factor()
    return dict_Jg_lumped
def graph_and_edge_ring_from_dict(dG):
    Gstr = DiGraph(dG)
    g_vars = [ e[2] for e in Gstr.edges(sort=True) ]
    Vg=PolynomialRing(ZZ,names=g_vars,order='invlex')
    G = copy(Gstr)
    for estr in Gstr.edges(sort=True):
        G.set_edge_label(estr[0],estr[1],Vg(estr[2]))
    return G, Vg
def symmetrize(DerGH,rates=False):
    DerGH_merge = copy(DerGH)
    if rates:
        for v in DerGH_merge.vertices(sort=True):
            if v[0]==v[1]:
                for w in list(DerGH_merge.neighbor_out_iterator(v)):
                    DerGH_merge.set_edge_label(v,w,( 2*DerGH_merge.edge_label(v,w)[0],\
                                                 DerGH_merge.edge_label(v,w)[1] ))
    for v in DerGH_merge.vertices(sort=True): 
        for u in DerGH_merge.vertices(sort=True):
            if v != u:
                if v[0] == u[1] and v[1] == u[0]:
                    DerGH_merge.merge_vertices([v,u])
    return DerGH_merge
dG_house = {0: {1: 'a', 2: 'c', 4: 'f'}, 1: {0: 'A', 2: 'b'}, 2: {0: 'C', 1: 'B', 3: 'd'}, 3: {2: 'D', 4: 'e'}, 4: {3: 'E', 0: 'F'}}
dG_c3_handle = {0: {1: 'a', 2: 'c'}, 1: {0: 'A', 2: 'b'}, 2: {0: 'C', 1: 'B', 3: 'd'}, 3: {2: 'D'}}
dG_c4_chord = {0: {1: 'a', 2: 'c', 3: 'e'}, 1: {0: 'A', 2: 'b'}, 2: {0: 'C', 1: 'B', 3: 'd'}, 3: {0: 'E', 2: 'D'}}
dG_c3_tail = {0: {1: 'a', 2: 'C'}, 1: {0: 'A', 2: 'b'}, 2: {0: 'c', 1: 'B', 3: 'd'}, 3: {2: 'd'}}
dG_p2 = {0: {1: 'a'}, 1: {0: 'A'}}
dG_p3 = {0: {1: 'a'}, 1: {0: 'A', 2: 'b'}, 2: {1: 'B'}}
dG_p4 = {0: {1: 'a'}, 1: {0: 'A', 2: 'b'}, 2: {1: 'B', 3: 'c'}, 3: {2: 'C'}}
dG_c3 = {0: {1:'a', 2:'C'}, 1: {0:'A', 2:'b'}, 2: {0:'c', 1:'B'}}
dH_c5 = {0: {1: 'v', 2: 'x', 4: 'Z'}, 1: {0: 'V', 2: 'w'}, 2: {1: 'W', 3: 'x'}, 3: {2: 'X', 4: 'y'}, 4: {0: 'z', 3: 'Y'}}
dH_c3 = {0: {1: 'x', 2: 'Z'}, 1: {0: 'X', 2: 'y'}, 2: {0: 'z', 1: 'Y'}}
dH_p4 = {0: {1: 'x'}, 1: {0: 'X', 2: 'y'}, 2: {1: 'Y', 3: 'z'}, 3: {2: 'Z'}}
dH_p3 = {0: {1: 'x'}, 1: {0: 'X', 2: 'y'}, 2: {1: 'Y'}}
dH_p2 = {0: {1: 'x'}, 1: {0: 'X'}}
dH_p2_r2 = {2: {3: 'x'}, 3: {2: 'X'}}
dH_c3_r3 = {3: {4: 'x', 5: 'Z'}, 4: {3: 'X', 5: 'y'}, 5: {3: 'z', 4: 'Y'}}
dH_p3_r3 = {3: {4: 'x'}, 4: {3: 'X', 5: 'y'}, 5: {4: 'Y'}}