import numpy as np
import scipy.sparse
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from pyjjasim.variable_row_array import VarRowArray
"""
Embedded Graph Module
"""
class NotSingleComponentError(Exception):
pass
class NotPlanarEmbeddingError(Exception):
pass
class NonSimpleError(Exception):
pass
class SelfLoopError(Exception):
pass
class NodeNotExistError(Exception):
pass
class EdgeNotExistError(Exception):
pass
[docs]class EmbeddedGraph:
"""
Class for embedded 2D graphs. Can be used to construct faces and check planarity.
Definitions:
- l-cycle:
Cycle generated by traversing the embedded graph always taking the leftmost turn at
every node.
- face-cycle:
An l-cycle with positive oriented area (using shoelace formula). If the graph is planar,
face cycles are the boundaries of faces.
Requirements:
- No self-loops
- Simple (no multigraph)
Parameters
----------
x, y : (N,) array
Coordinates of nodes of embedded graph.
node1, node2 : (E,) int array in range(N)
endpoint nodes of edges in embedded graph. Nodes are referred to by their index in
the coordinate arrays.
require_single_component=False :
If True, an error is raised if the graph is not single-component.
require_planar_embedding=False :
If True, an error is raised if the graph is not a planar embedding.
Raises
------
NonSimpleError
If graph is not simple.
SelfLoopError
if graph contains self-loops.
NotSingleComponentError
if graph has disconnected components and require_single_component=True.
NotPlanarEmbeddingError
If graph in not a planar embedding and require_planar_embedding=True.
"""
def __init__(self, x, y, node1, node2, require_single_component=False,
require_planar_embedding=False, _edges_are_sorted=None):
self.x = np.array(x, dtype=np.double).ravel()
self.y = np.array(y, dtype=np.double).ravel()
if len(self.x) != len(self.y):
raise ValueError("x and y must be same size")
self.N = len(self.x)
self.node1, self.node2 = np.array(node1, dtype=int).ravel(), np.array(node2, dtype=int).ravel()
self._assert_edges_correct_shape()
self.E = len(self.node1)
self._assert_edges_contain_existing_nodes()
self._assert_no_self_loop()
if _edges_are_sorted is None:
self.edge_permute = np.arange(self.edge_count())
self.edge_flip = np.zeros(self.edge_count(), dtype=bool)
self._sort_edges()
else:
self.edge_permute, self.edge_flip = _edges_are_sorted
self._assert_nonsimple()
self.edge_v_array = self._assign_edge_v_array()
# quantities are computed and stored only when a method needs them.
self.F = None
self.face_permutation = None
self.faces_v_array = None
self.face_edges = None
self.face_nodes = None
self.face_lengths = None
self.determinant = None
self.areas = None
self.roll_ids = None
self.boundary_face_indices = None
if require_single_component:
self._assert_single_component()
if require_planar_embedding:
self._assert_planar_embedding()
[docs] def get_edges(self):
"""
Return node endpoints of all edges.
Returns
-------
node1, node2 : (E,) int arrays in range(N)
Node endpoints of all edges.
"""
return self._de_sort_edges()
[docs] def get_edge_ids(self, node1, node2, return_direction=False):
"""
Return indices of edges with given endpoint nodes.
Parameters
----------
node1, node2 : arrays in range(N)
Endpoints of edges.
return_direction=False : bool
Returns
-------
edge_ids : array with same size as node1 in range(E)
Returns indices of edges.
edge_directions : bool array with same size as node1
True if aligned with edge direction
Raises
------
EdgeNotExistError
If a queried node-pair does not exist.
"""
node1, node2 = np.array(node1, dtype=int).ravel(), np.array(node2, dtype=int).ravel()
mask = node1 < node2
node1, node2 = np.where(mask, node1, node2), np.where(mask, node2, node1)
if np.any(node1 > self.node1[-1]):
raise EdgeNotExistError("queried edge that does not exist")
starts, ends = self.edge_v_array.row_ranges()
node1_idx = np.searchsorted(self.node1[starts], node1)
starts, ends = starts[node1_idx], ends[node1_idx]
while not np.all(node2 <= self.node2[starts]):
starts[node2 > self.node2[starts]] += 1
if not np.all(starts < ends):
raise EdgeNotExistError("queried edge that does not exist")
if not np.all((self.node1[starts] == node1) & (self.node2[starts] == node2)):
raise EdgeNotExistError("queried edge that does not exist")
if not return_direction:
return self.edge_permute[starts]
else:
e = self.edge_permute[starts]
return e, self.edge_flip[e] != mask
[docs] def add_nodes(self, x, y):
"""
Add nodes to graph.
Parameters
----------
x, y : arrays
Coordinates of the nodes to be added to the graph
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
New graph with nodes added to it.
"""
return EmbeddedGraph(np.append(self.x, np.array(x, dtype=np.double).ravel()), np.append(self.y, np.array(y, dtype=np.double).ravel()), self.node1, self.node2, require_single_component=False)
[docs] def add_edges(self, node1, node2, require_single_component=False, require_planar_embedding=False):
"""
Add edges to graph.
Parameters
----------
node1, node2 : arrays in range(N)
Endpoints of edges.
require_single_component=False :
If True; raises error if the resulting graph is not single-component.
require_planar_embedding=False :
If True; raises error if the resulting graph is not a planar embedding.
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
New graph with edges added to it.
"""
return EmbeddedGraph(self.x, self.y, np.append(self.node1, np.array(node1, dtype=int).ravel()),
np.append(self.node2, np.array(node2, dtype=int).ravel()),
require_single_component=require_single_component,
require_planar_embedding=require_planar_embedding)
[docs] def add_nodes_and_edges(self, x, y, node1, node2, require_single_component=False,
require_planar_embedding=False):
"""
Add nodes and edges to graph.
Parameters
----------
x, y : arrays
coordinates of the nodes to be added to the graph
node1, node2 : arrays in range(N + x.size)
endpoints of edges. The i-th new node must be referred to by index N + i.
require_single_component=False :
If True; raises error if the resulting graph is not single-component
require_planar_embedding=False :
If True; raises error if the resulting graph is not a planar embedding
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
new graph with nodes and edges added to it.
"""
return EmbeddedGraph(np.append(self.x, np.array(x, dtype=np.double).ravel()),
np.append(self.y, np.array(y, dtype=np.double).ravel()),
np.append(self.node1, np.array(node1, dtype=int).ravel()),
np.append(self.node2, np.array(node2, dtype=int).ravel()),
require_single_component=require_single_component,
require_planar_embedding=require_planar_embedding)
[docs] def remove_nodes(self, node_ids, require_single_component=False, require_planar_embedding=False):
"""
Remove nodes from graph.
Parameters
----------
node_ids : int array in range(N)
Indices of nodes to be removed.
require_single_component=False :
If True; raises error if the resulting graph is not single-component.
require_planar_embedding=False :
If True; raises error if the resulting graph is not a planar embedding.
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
New graph with nodes removed from it.
"""
node_ids = np.array(node_ids, dtype=int).ravel()
node_map = np.ones(self.node_count(), dtype=int)
node_map[node_ids] = 0
node_map = np.cumsum(node_map) - node_map
node_map[node_ids] = -1
n1, n2 = node_map[self.node1], node_map[self.node2]
edge_ids = (n1 >= 0) & (n2 >= 0)
return EmbeddedGraph(np.delete(self.x, node_ids), np.delete(self.y, node_ids),
n1[edge_ids], n2[edge_ids], require_single_component=require_single_component,
require_planar_embedding=require_planar_embedding)
[docs] def remove_edges_by_ids(self, edge_ids, require_single_component=False, require_planar_embedding=False):
"""
Remove edges from graph with edge ids as input.
Parameters
----------
edge_ids : int array in range(E)
Indices of nodes to be removed.
require_single_component=False :
If True; raises error if the resulting graph is not single-component.
require_planar_embedding=False :
If True; raises error if the resulting graph is not a planar embedding.
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
new graph with edges removed from it.
"""
ids_internal = self.edge_permute_inverse[edge_ids]
return EmbeddedGraph(self.x, self.y, np.delete(self.node1, ids_internal),
np.delete(self.node2, ids_internal), _edges_are_sorted=True,
require_single_component=require_single_component,
require_planar_embedding=require_planar_embedding)
[docs] def remove_edges(self, node1, node2, require_single_component=False, require_planar_embedding=False):
"""
Remove edges from graph with node endpoints as input.
Parameters
----------
node1, node2 : int arrays in range(N)
The indices of node endpoints of edges to be removed.
require_single_component=False :
If True; raises error if the resulting graph is not single-component.
require_planar_embedding=False :
If True; raises error if the resulting graph is not a planar embedding.
Returns
-------
new_graph : :py:attr:`EmbeddedGraph`
new graph with edges removed from it.
Raises
------
EdgeNotExistError
If a queried node-pair does not exist.
"""
return self.remove_edges_by_ids(self.get_edge_ids(node1, node2),
require_single_component=require_single_component,
require_planar_embedding=require_planar_embedding)
[docs] def coo(self):
"""
Returns coordinates of nodes.
Returns
-------
x, y : (N,) arrays
Coordinates of nodes in graph.
"""
return self.x, self.y
[docs] def node_count(self):
"""
Returns number of nodes in graph (abbreviated by N).
"""
return self.N
[docs] def edge_count(self):
"""
Returns number of edges in graph (abbreviated by E).
"""
return self.E
[docs] def get_l_cycles(self, to_list=True):
"""
Returns for all l-cycles the nodes it traverses in order.
Parameters
----------
to_list=False :
If true, output is in form of list-of-lists, otherwise concatenated array.
Returns
-------
nodes : list-of-lists or array
For all l-cycles the nodes it traverses in order.
lengths : (FR,) int array
Length of every l-cycle.
"""
self._assign_faces()
self._assign_face_nodes()
nodes, lengths = self.face_nodes, self.face_lengths
if to_list:
return VarRowArray(lengths).to_list(nodes), lengths
return nodes, lengths
[docs] def get_face_cycles(self, to_list=True):
"""
Returns for all face-cycles the nodes it traverses in order. If the graph is planar,
these enclose individual faces.
Parameters
----------
to_list=False :
If true, output is in form of list-of-lists, otherwise concatenated array.
Returns
-------
nodes : list-of-lists or array
For all face-cycles the nodes it traverses in order.
lengths : (F,) int array
Length of every face-cycle.
"""
self._assign_faces()
self._assign_face_nodes()
self._assign_boundary_faces()
nodes = self.faces_v_array.delete_rows(self.face_nodes, self.boundary_face_indices)
lengths = np.delete(self.face_lengths, self.boundary_face_indices)
if to_list:
return VarRowArray(lengths).to_list(nodes), lengths
return nodes, lengths
[docs] def face_count(self):
"""
Returns number of face cycles in graph (abbreviated by F). In a planar
graph this equals the number of faces.
"""
self._assign_faces()
self._assign_boundary_faces()
return self.F - len(self.get_boundary_faces())
[docs] def l_cycle_count(self):
"""
Returns number of l-cycles in graph. (abbreviated by FR)
"""
self._assign_faces()
return self.F
[docs] def is_planar_embedding(self):
"""
Returns if graph is planar embedding, which is true if edges only intersect at their endpoints.
"""
self._assign_faces()
return self.node_count() + self.face_count() == self.edge_count() + 1
[docs] def get_face_areas(self):
"""
Returns areas of face-cycles. These correspond to face-areas is the graph
is planar.
Returns
-------
areas : (F,) array
Areas of face-cycles.
"""
self._assign_areas()
self._assign_boundary_faces()
return np.delete(self.areas, self.boundary_face_indices)
[docs] def get_l_cycle_areas(self):
"""
Returns signed areas of l-cycles.
Returns
-------
areas : (FR,) array
Signed areas of l-cycles.
"""
self._assign_areas()
return self.areas
[docs] def get_l_cycle_centroids(self):
"""
Returns centroids of l_cycles in graph
Returns
-------
x, y : (F,) arrays
Returns coordinates of centroids of l_cycles.
"""
six_times_area = 6.0 * self.get_l_cycle_areas()
mask = np.isclose(six_times_area, 0.0)
six_times_area[mask] = 2.0 * self.face_lengths[mask]
long_mask = self.faces_v_array.at_out_index(mask)
X = self.x[self.face_nodes] + self.x[self.roll_ids]
Y = self.y[self.face_nodes] + self.y[self.roll_ids]
centroid_x = self.faces_v_array.sum(np.where(long_mask, X, self.determinant * X)) / six_times_area
centroid_y = self.faces_v_array.sum(np.where(long_mask, Y, self.determinant * Y)) / six_times_area
return centroid_x, centroid_y
[docs] def get_face_centroids(self):
"""
Returns centroids of faces in graph
Returns
-------
x, y : (F,) arrays
Returns coordinates of centroids of faces.
"""
centroid_x, centroid_y = self.get_l_cycle_centroids()
self._assign_boundary_faces()
return np.delete(centroid_x, self.boundary_face_indices), \
np.delete(centroid_y, self.boundary_face_indices)
[docs] def get_num_components(self):
"""
Returns number of connected components of graph.
"""
return scipy.sparse.csgraph.connected_components(
self.adjacency_matrix(), directed=False, return_labels=False)
[docs] def get_components(self):
"""
Returns components of graph.
Returns
-------
components : (N,) int array in range(num_components)
Components of graph.
"""
_, components = scipy.sparse.csgraph.connected_components(
self.adjacency_matrix(), directed=False, return_labels=True)
return components
[docs] def split_components(self):
"""
Returns list of individual components of graph as separate EmbeddedGraph objects.
"""
components = self.get_components()
num_components = np.max(components) + 1
return [self.remove_nodes(np.flatnonzero(components != c)) for c in range(num_components)]
[docs] def get_boundary_faces(self):
"""
Returns indices of l_cycles that are boundary faces.
"""
self._assign_boundary_faces()
return self.boundary_face_indices
[docs] def l_cycle_matrix(self):
"""
Returns cycle matrix build with l_cycles.
Cycle matrix is an (F, E) sparse matrix where the i-th row corresponds to the
i-th l_cycle. The entry is +1 or -1 if the cycle contains that edge.
The value +1 is assigned if traversing the cycle counter clockwise first encounters
the endpoint node of the edge with lowest id. If not, -1 is assigned.
"""
self._assign_faces()
return self._cycle_matrix(self.face_edges, self.face_lengths, self.edge_permute, self.edge_flip)
def _cycle_matrix(self, face_edges, face_lengths, permute, edge_flip):
E, F = self.edge_count(), len(face_lengths)
indptr = np.append([0], np.cumsum(face_lengths))
indices, data = permute[face_edges % E], 1 - 2 * (face_edges // E)
data = np.where(edge_flip[indices], -data, data)
return scipy.sparse.csr_matrix((data, indices, indptr), shape=(F, E)).tocsc()
[docs] def face_cycle_matrix(self):
"""
Returns cycle matrix build with face cycles
Cycle matrix is an (F, E) sparse matrix where the i-th row corresponds to the
i-th face. The entry is +1 or -1 if the cycle contains that edge.
The value +1 is assigned if traversing the cycle counter clockwise first encounters
the endpoint node of the edge with lowest id. If not, -1 is assigned.
"""
self._assign_faces()
self._assign_boundary_faces()
nb_face_edges = self.face_edges[self.faces_v_array.get_item(rows=self._non_boundary_mask())]
nb_face_lengths = self.face_lengths[self._non_boundary_mask()]
return self._cycle_matrix(nb_face_edges, nb_face_lengths, self.edge_permute, self.edge_flip)
[docs] def cut_space_matrix(self):
"""
Returns cut-space matrix.
Cut-space matrix is an (N, E) sparse matrix where the i-th row corresponds to the
i-th node. The entry is +1 or -1 if the node is an endpoint of that edge. It is +1
if the node is the endpoint with highest idx, -1 otherwise.
"""
E, N = self.edge_count(), self.node_count()
n1, n2 = self.get_edges()
row = np.concatenate((n1, n2))
col = np.concatenate((np.arange(E), np.arange(E)))
data = np.concatenate((-np.ones(E), np.ones(E)))
return scipy.sparse.coo_matrix((data, (row, col)), shape=(N, E)).tocsc()
[docs] def adjacency_matrix(self):
"""
Returns adjacency matrix.
Adjacency matrix is an (N, N) sparse symmetric matrix where a node-pair
entry is 1 if the nodes share an edge, 0 if not.
"""
E, N = self.edge_count(), self.node_count()
data = np.ones(2 * E)
row = np.append(self.node1, self.node2)
col = np.append(self.node2, self.node1)
return scipy.sparse.coo_matrix((data, (row, col)), shape=(N, N)).tocsc()
[docs] def get_common_edge_of_l_cycles(self, cycle1, cycle2, return_orientation=False):
"""
Returns index of an edge occurring in both cycles if it exists; otherwise -1.
Parameters
----------
cycle1, cycle2 : int arrays in range(F)
Indices referring to l_cycles.
return_orientation=False :
If True, orientation of edge with respect to cycle pair is returned.
Returns
-------
shared_edge : array in range(E) with same size as cycle1,2
Edge shared by l_cycle1 and l_cycle2. If multiple exist;
returns one with the lowest index.
orientation : bool array with same size as cycle1,2 (optional)
True if cycle1 passes edge counterclockwise first encountering its
node with lowest index.
"""
self._assign_faces()
f = self.faces_v_array.rows()[np.argsort(self.face_edges)].astype(np.int64)
E = self.edge_count()
f1, f2 = f[:E], f[E:2 * E]
mask = f1 < f2
f1, f2 = np.where(mask, f1, f2), np.where(mask, f2, f1)
sorter = np.lexsort((f2, f1))
f1, f2 = f1[sorter], f2[sorter]
A = np.array(list(zip(f1, f2)), dtype=[('f1', 'int'), ('f2', 'int')])
cycle1, cycle2 = np.array(cycle1, dtype=int), np.array(cycle2, dtype=int)
input_shape = cycle1.shape
cycle1, cycle2 = cycle1.ravel(), cycle2.ravel()
mask2 = cycle1 < cycle2
fmin, fmax = np.where(mask2, cycle1, cycle2).astype(np.int64), np.where(mask2, cycle2, cycle1).astype(np.int64)
V = np.array(list(zip(fmin, fmax)), dtype=[('fmin', 'int'), ('fmax', 'int')])
edge_ids = np.searchsorted(A, V)
edge_ids[edge_ids >= E] = E - 1
if return_orientation:
orientation = mask[sorter][edge_ids] ^ ~mask2
found_mask = (f1[edge_ids] != fmin) | (f2[edge_ids] != fmax)
edge_ids = sorter[edge_ids]
edge_ids[found_mask] = -1
ids = self.edge_permute[edge_ids.reshape(input_shape)]
if return_orientation:
orientation[found_mask] = False
return ids, orientation.reshape(input_shape) ^ self.edge_flip[ids]
else:
return ids
[docs] def permute_nodes(self, permutation):
"""
Permute node order. Because edges refer to nodes by their position,
this also changes node1 and node2.
"""
permutation = np.array(permutation, dtype=int)
if not np.all(np.sort(permutation) == np.arange(self.node_count())):
raise ValueError("invalid permutation")
self.x = self.x[permutation]
self.y = self.y[permutation]
inv_perm = np.argsort(permutation)
self.node1 = inv_perm[self.node1]
self.node2 = inv_perm[self.node2]
self.node1, self.node2 = self._de_sort_edges()
self._sort_edges()
self._recompute_faces()
[docs] def permute_faces(self, permutation):
"""
Permute face order.
"""
permutation = np.array(permutation, dtype=int)
if not np.all(np.sort(permutation) == np.arange(self.l_cycle_count())):
raise ValueError("invalid permutation")
self.face_permutation = self.face_permutation[permutation]
self._recompute_faces()
def face_dual_graph(self):
A = self.face_cycle_matrix()
adj = (A @ A.T).tocoo()
mask = adj.row < adj.col
return EmbeddedGraph(*self.get_face_centroids(), adj.row[mask], adj.col[mask])
[docs] def locate_faces(self, x, y):
"""
Get faces whose centroids are closest to queried coordinate.
Graph must be planar embedding.
Attributes
----------
x, y: arrays :
Coordinates at which one wants to locate faces.
Returns
-------
face_ids : int array with same size as x in range(F)
Returns indices of located faces.
"""
if not self.is_planar_embedding():
raise ValueError("Only works for planar embedding")
locator = scipy.spatial.KDTree(np.stack(self.get_face_centroids(), axis=-1))
_, face_ids = locator.query(np.stack(np.broadcast_arrays(x, y), axis=-1), k=1)
return face_ids, locator
[docs] def plot(self, fig=None, ax=None, show_cycles=True, cycles="face_cycles", figsize=[5, 5],
show_node_ids=False, show_edge_ids=False, show_face_ids=False,
face_shrink_factor=0.9, markersize=5, linewidth=1, ax_position=None, title=""):
"""
Visualize embedded graph. Can optionally show indices of nodes, edges and faces.
Attributes
----------
fig=None: None or matplotlib figure :
Figure to embed the axis in. If None, new figure is created.
ax=None: None or matplotlib axes :
Axes to plot the graph in. If None, a new axis is created.
show_cycles=True : bool
If true, the cycles are displayed.
cycles="face_cycles" : str
What cycles to show. Options are "face_cycles" or "l_cycles".
figsize=[5, 5] : array-like
Size of figure.
show_node_ids=False : bool
If True, node ids are shown.
show_edge_ids=False : bool
If True, edge ids are shown.
show_face_ids=False : bool
If True, face ids are shown.
face_shrink_factor=0.9 : float
Shrinks faces around their centroid. This ensures they do not overlap with edges and are
easier to make out.
markersize=5 : int
Size of nodes.
linewidth=1 : int
Width of lines representing edges and faces.
ax_position=None : None or array-like
Manual position of axis.
title="" : str
Title of figure.
Returns
-------
fig : matplotlib figure handle
Returns figure handle
ax : matplotlib axis handle
Returns axis handle
"""
def _face_line(x, y, n, xcn, ycn, f_shrink):
xp = f_shrink * x[np.append(n, n[0])] + (1 - f_shrink) * xcn
yp = f_shrink * y[np.append(n, n[0])] + (1 - f_shrink) * ycn
return np.stack((xp, yp), axis=1)
if fig is None and ax is None:
fig, ax = plt.subplots(figsize=figsize)
if fig is None:
fig = plt.figure()
if ax is None:
if ax_position is not None:
ax = fig.add_axes(ax_position)
else:
ax = fig.add_axes([0.125,0.11,0.775,0.77], label=str(np.random.randi(0, 10**5, 1)))
if ax_position is not None:
ax.set_position(ax_position)
self.fig, self.ax = fig, ax
x, y = self.coo()
n1, n2 = self.get_edges()
lines = [((x[n1[i]], y[n1[i]]), (x[n2[i]], y[n2[i]])) for i in range(len(n1))]
lc = LineCollection(lines, colors=[0.5,0.5,0.5], linewidths=linewidth)
self.ax.add_collection(lc)
self.ax.plot(x, y, color=[0,0,0], marker="o", markerfacecolor=[0,0,0], linestyle="None", markersize=markersize)
if show_node_ids:
for i, (xn, yn) in enumerate(zip(x, y)):
self.ax.text(xn, yn, i.__str__())
if show_edge_ids:
x1, y1, x2, y2 = x[n1], y[n1], x[n2], y[n2]
for i, (xn, yn) in enumerate(zip(0.5 * (x1 + x2), 0.5 * (y1 + y2))):
self.ax.annotate(i.__str__(), (xn, yn), color=[0.3, 0.5, 0.9], ha='center', va='center')
if show_cycles:
lines = []
if cycles == "face_cycles":
xc, yc = self.get_face_centroids()
pn, _ = self.get_face_cycles(to_list=True)
for i, (xcn, ycn, n) in enumerate(zip(xc, yc, pn)):
lines += [_face_line(x, y, n, xcn, ycn, face_shrink_factor)]
if show_face_ids:
self.ax.annotate(i.__str__(), (xcn, ycn), color=[1, 0.5, 0.2], ha='center', va='center')
if cycles == "l_cycles":
xc, yc = self.get_l_cycle_centroids()
pn, _ = self.get_l_cycles(to_list=True)
b_mask = ~self._non_boundary_mask()
lines = []
for i, (xcn, ycn, n) in enumerate(zip(xc, yc, pn)):
if b_mask[i]:
xp, yp = x[n], y[n]
self.ax.plot(np.append(xp, xp[0]), np.append(yp, yp[0]),
color=[0.2, 0.5, 1], linewidth=linewidth)
if show_face_ids:
self.ax.annotate(i.__str__(), (xcn, ycn), color=[0.2, 0.5, 1], ha='center', va='center')
else:
lines += [_face_line(x, y, n, xcn, ycn, face_shrink_factor)]
if show_face_ids:
self.ax.annotate(i.__str__(), (xcn, ycn), color=[1, 0.5, 0.2], ha='center', va='center')
lc1 = LineCollection(lines, colors=[1, 0.5, 0.2], linewidths=linewidth)
self.ax.add_collection(lc1)
self.ax.set_title(title)
return self.fig, self.ax
[docs] def shortest_path(self, node1, node2, to_list=True):
"""
Returns nodes encountered along a shortest path from node1 to node2 through graph.
- paths include node1 and node2.
- If node1, node2 are (P,) arrays, returns P paths where path i goes from node1[i] to node2[i].
Parameters
----------
node1, node2 : (P,) array
Start- and end-nodes of paths.
to_list=False :
If true, output is in form of list-of-lists, otherwise concatenated array.
Returns
-------
paths : list-of-lists or array
All nodes that are traversed in path
lengths : (FR,) int array
Length of every l-cycle.
"""
node1, node2 = np.atleast_1d(node1), np.atleast_1d(node2)
P = len(node2)
up1, idx = np.unique(node2, return_inverse=True)
_, pred = scipy.sparse.csgraph.shortest_path(self.adjacency_matrix(), return_predecessors=True, indices=up1)
K = pred.shape[1]
pred[pred < 0] = K
pred = np.concatenate((pred, K * np.ones((pred.shape[0], 1), dtype=int)), axis=1)
p = np.array(node1).copy()
out = np.zeros((5, P), dtype=int)
out[0, :] = node1
k = 1
while not np.all(p == K):
if k >= (out.shape[0] - 1):
out = np.concatenate((out, 0 * out), axis=0)
p = pred[(idx, p)]
out[k, :] = p
k += 1
out = out[:k, :]
path_lengths = np.argmax(out == K, axis=0)
out = out.T
paths = out[out < K].ravel()
if to_list:
return VarRowArray(path_lengths).to_list(paths), path_lengths
return paths, path_lengths
def _assert_edges_correct_shape(self):
if len(self.node1) != len(self.node2):
raise ValueError("node1 and node2 must be same size")
def _assert_edges_contain_existing_nodes(self):
if np.any((self.node1 < 0) | (self.node1 >= self.N) | (self.node2 < 0) | (self.node2 >= self.N)):
raise NodeNotExistError("node1,2 values must be in range(N)")
def _assert_single_component(self):
if self.get_num_components() != 1:
raise NotSingleComponentError()
def _assert_planar_embedding(self):
if not self.is_planar_embedding():
raise NotPlanarEmbeddingError()
def _assert_no_self_loop(self):
if np.any(self.node1 == self.node2):
raise SelfLoopError("no edge is allowed to have identical end nodes.")
def _assert_nonsimple(self):
if np.any((self.node1[:-1] == self.node1[1:]) & (self.node2[:-1] == self.node2[1:])):
raise NonSimpleError("no duplicate edges allowed.")
def _assign_edge_v_array(self):
self.edge_v_array = VarRowArray(np.diff(np.append(np.flatnonzero(np.roll(self.node1, 1) - self.node1), len(self.node1))))
return self.edge_v_array
def _sort_edges(self):
self.edge_flip = self.node1 > self.node2
nn1 = np.where(self.edge_flip, self.node2, self.node1)
nn2 = np.where(self.edge_flip ,self.node1, self.node2)
self.edge_permute = np.lexsort((nn2, nn1))
self.edge_permute_inverse = np.argsort(self.edge_permute)
self.node1, self.node2 = nn1[self.edge_permute], nn2[self.edge_permute]
def _de_sort_edges(self):
nn1, nn2 = self.node1[self.edge_permute_inverse], self.node2[self.edge_permute_inverse]
return np.where(self.edge_flip, nn2, nn1), np.where(self.edge_flip, nn1, nn2)
def _reset_precomputed_quantities(self):
self.F = None
self.faces_v_array = None
self.face_edges = None
self.face_nodes = None
self.face_lengths = None
self.determinant = None
self.areas = None
self.roll_ids = None
self.boundary_face_indices = None
def _assign_faces(self):
if self.face_edges is None:
self.face_edges, self.face_lengths = self._construct_l_cycles()
self.faces_v_array = VarRowArray(self.face_lengths)
self.F = len(self.face_lengths)
if self.face_permutation is None:
self.face_permutation = np.arange(self.F)
def _assign_face_nodes(self):
self._assign_faces()
if self.face_nodes is None:
e_idx = self.face_edges % self.edge_count()
self.face_nodes = np.where(self.face_edges < self.edge_count(), self.node1[e_idx], self.node2[e_idx])
def _assign_roll_ids(self):
self._assign_face_nodes()
if self.roll_ids is None:
self.roll_ids = self.face_nodes[self.faces_v_array.roll(1)]
def _assign_determinant(self):
self._assign_roll_ids()
if self.determinant is None:
self.determinant = self.x[self.roll_ids] * self.y[self.face_nodes] - self.x[self.face_nodes] * self.y[self.roll_ids]
def _assign_areas(self):
self._assign_determinant()
if self.areas is None:
self.areas = 0.5 * self.faces_v_array.sum(self.determinant)
def _assign_boundary_faces(self):
self._assign_areas()
if self.boundary_face_indices is None:
self.boundary_face_indices = np.flatnonzero((self.areas < 0) | np.isclose(self.areas, 0.0))
def _recompute_faces(self):
self._reset_precomputed_quantities()
permutation = self.face_permutation
inv_permutation = np.argsort(permutation)
self._assign_faces()
self._assign_face_nodes()
self._assign_areas()
self._assign_determinant()
self._assign_roll_ids()
self._assign_boundary_faces()
self.face_lengths = self.face_lengths[permutation]
self.face_edges = self.faces_v_array.permute_rows(permutation, self.face_edges)
self.areas = self.areas[permutation]
self.determinant = self.faces_v_array.permute_rows(permutation, self.determinant)
self.roll_ids = self.faces_v_array.permute_rows(permutation, self.roll_ids)
self.boundary_face_indices = inv_permutation[self.boundary_face_indices]
self.face_nodes = self.faces_v_array.permute_rows(permutation, self.face_nodes)
self.faces_v_array = VarRowArray(self.faces_v_array.counts[permutation])
def _get_edge_map(self):
"""
Computes a one-to-one map over directed edges, where the image contains the edge
which is encountered next when traversing the graph moving counter-clockwise.
Used to generate faces by repeating indexing: e_(i+1) = map[e_i]
map: np.arange(2*E) -> sorted_out_edge_directed
Output:
sorted_out_edge_directed (2*E,) in range(2*E)
"""
counter_clockwise = True
edge_count = self.edge_count()
ns = np.append(self.node1, self.node2)
# construct neighbour structure
nodes, count = np.unique(ns, return_counts=True)
neighbours = VarRowArray(count)
neighbour_edges = np.tile(np.arange(edge_count), 2)[np.argsort(ns)]
neighbour_node_self = nodes[neighbours.rows()]
neighbour_node_other = np.where(self.node1[neighbour_edges] == neighbour_node_self,
self.node2[neighbour_edges], self.node1[neighbour_edges])
# sort neighbour in-dimension by ascending angle
angles = np.arctan2(self.y[neighbour_node_other] - self.y[neighbour_node_self],
self.x[neighbour_node_other] - self.x[neighbour_node_self])
angle_arg_sort = np.argsort(3 * np.pi * neighbour_node_self.astype(np.double) + angles)
neighbour_edges = neighbour_edges[angle_arg_sort]
neighbour_node_other = neighbour_node_other[angle_arg_sort]
def to_directed(edge_nr, edge_direction, edge_count):
return edge_nr + edge_count * (1 - edge_direction.astype(int))
# create combined-index for input edges of the map
neighbour_edges_direction = neighbour_node_other < neighbour_node_self
in_edge_combined_index = to_directed(neighbour_edges, neighbour_edges_direction, edge_count)
# find the map from every (combined index) edge to the next edge in (counter-)-clockwise direction
edges_map = neighbours.roll() if counter_clockwise else neighbours.roll(-1)
# find the combined index of the output edges of the map
out_edge_directed = to_directed(neighbour_edges, ~neighbour_edges_direction, edge_count)[edges_map]
# sort the edge-map based on the (combined index of the) input edges
sorted_out_edge_directed = out_edge_directed[np.argsort(in_edge_combined_index)]
return sorted_out_edge_directed
def _construct_l_cycles(self):
"""
Computes l-cycles of the embedded graph. This is done using repeated iteration over the
one-to-one map over directed edges computed with _get_edge_map(). It starts with all
edges pointing from lowest to highest node to ensure all cycles are found.
Used to generate cycles by repeating indexing: e_(i+1) = map[e_i]
map: np.arange(2*E) -> sorted_out_edge_directed
Output:
sorted_out_edge_directed (2*E,) in range(2*E)
"""
map = self._get_edge_map()
edge_ids = np.arange(self.edge_count())
cycles = -np.ones((3, self.edge_count()), dtype=int)
cycles[0, :] = edge_ids
cycle_lengths = np.ones(self.edge_count(), dtype=int)
current_cycle_length, out_cycles, out_cycle_lengths = 1, np.zeros(0, dtype=int), np.zeros(0, dtype=int)
# iteration doing counter-clockwise walks through the graphs starting from each junction
while len(edge_ids) > 0:
edge_ids = map[edge_ids]
is_terminated = edge_ids == cycles[0, :]
if np.any(is_terminated):
terminated_cycles = cycles[:current_cycle_length, is_terminated].T
out_cycle_lengths, out_cycles = self._store_terminated_cycles(terminated_cycles, out_cycle_lengths, out_cycles)
cycles, cycle_lengths = cycles[:, ~is_terminated], cycle_lengths[~is_terminated]
edge_ids = edge_ids[~is_terminated]
cycles[current_cycle_length, :] = edge_ids
current_cycle_length += 1
if cycles.shape[0] == (current_cycle_length):
cycles = np.append(cycles, -np.ones(cycles.shape, dtype=int), axis=0)
return out_cycles, out_cycle_lengths
def _store_terminated_cycles(self, terminated_cycles, out_cycle_lengths, out_cycles):
# roll cycles until the lowest node index is in the first column (so its easier to remove duplicate cycles)
cycle_cnt, cycle_len = terminated_cycles.shape
cycles = terminated_cycles[np.arange(cycle_cnt)[:, None], (
np.arange(cycle_len) + np.argmin(terminated_cycles, axis=1)[:, None]) % cycle_len]
_, idx = np.unique(cycles[:, 0], return_index=True)
cycles = cycles[idx, :]
out_cycle_lengths = np.append(out_cycle_lengths, cycle_len * np.ones(cycles.shape[0], dtype=int))
out_cycles = np.append(out_cycles, cycles.flatten())
return out_cycle_lengths, out_cycles
def _non_boundary_mask(self):
self._assign_boundary_faces()
out = np.ones(self.l_cycle_count(), dtype=bool)
out[self.boundary_face_indices] = False
return out
def _cycle_space_solve_for_integral_x(self, b):
"""
Solves the equation: A @ x = b (where A = cycle_matrix, without boundary faces).
If b is integral (contain only integers), the output array x will also be integral.
input: b (..., F)
output: x (..., E)
Notes:
- The equation is underdetermined, so the solution x is not unique.
"""
self._assign_boundary_faces()
if (self.get_num_components() != 1) or (not self.is_planar_embedding()):
raise ValueError("only implemented for single component planar embedding")
E, F = self.edge_count(), self.face_count()
b = np.array(b, dtype=np.double)
b_shape = list(b.shape)
b = b.reshape(-1, F)
b_tally = b.copy()
# insert boundary face in b_tally
b_idx = self.boundary_face_indices[0]
b_tally = np.concatenate((b_tally[:, :b_idx], np.zeros((b_tally.shape[0], 1)), b_tally[:, b_idx:]), axis=1)
# do depth first search, resulting in cur (current node of tree) and prev (parent node of cur)
A = self.l_cycle_matrix()
cur, predecessor = scipy.sparse.csgraph.depth_first_order(A @ A.T, b_idx)
prev = predecessor[cur]
prev[0] = -1
# find map of edge between pair of faces. signs returns +1 if face1 counterclockwise passes resulting edge in its own direction.
juncs, orientation_in_face_1 = self.get_common_edge_of_l_cycles(cur, prev, return_orientation=True)
sgns = (-1 + 2 * orientation_in_face_1.astype(np.double))
# construct x at each edge by passing through the tree in reverse.
x = np.zeros((b.shape[0], E), dtype=b.dtype)
for i in reversed(range(F+1)):
if prev[i] >= 0:
b_tally[:, prev[i]] += b_tally[:, cur[i]]
x[:, juncs[i]] += b_tally[:, cur[i]] * sgns[i]
b_tally[:, cur[i]] = 0
# check if resulting x solves A @ x == b
b_shape[-1] = E
mask = self._non_boundary_mask()
if not np.allclose((A @ x.T)[mask, :], b.T):
raise ValueError("failed integral solve of cycle space linear problem")
# return x
return x.reshape(tuple(b_shape))
[docs]class EmbeddedSquareGraph(EmbeddedGraph):
"""
Generate Embedded graph for a square lattice.
Parameters
----------
count_x, count_y : int scalar
Node count of square lattice in x- and y-direction respectively.
x_scale, y_scale=1.0 : float scalar
Stretch factors for square lattice in x- and y-direction respectively.
Bottom-left node is at origin.
"""
def __init__(self, count_x, count_y, x_scale=1.0, y_scale=1.0):
y, x = np.mgrid[0:count_y, 0:count_x]
idx = np.arange(count_x * count_y).reshape(count_y, count_x)
n1 = np.concatenate((idx[:, 0:-1].ravel(), idx[0:-1, :].ravel()))
n2 = np.concatenate((idx[:, 1:].ravel(), idx[1:, :].ravel()))
super().__init__(x * x_scale, y * y_scale, n1, n2)
[docs]class EmbeddedHoneycombGraph(EmbeddedGraph):
"""
Generate Embedded graph for a honeycomb lattice.
Parameters
----------
count_x, count_y : int scalar
The amount of times the unit cell (with shape /¯\ and has 4 nodes) is
repeated in the x- and y-direction; generating a honeycomb lattice.
x_scale, y_scale=1.0 : float scalar
Stretch factors for honeycomb lattice in x- and y-direction respectively.
Bottom-left node is at origin.
"""
def __init__(self, count_x, count_y, x_scale=1.0, y_scale=1.0):
y, x = np.mgrid[0:count_y, 0:count_x]
x1, y1 = 3.0 * x, np.sqrt(3.0) * y
nodes_x = np.concatenate((x1, x1 + 0.5, x1 + 1.5, x1 + 2), axis=0).ravel()
nodes_y = np.concatenate((y1, y1 + np.sqrt(0.75), y1 + np.sqrt(0.75), y1), axis=0).ravel()
idx = np.arange(count_x * count_y).reshape(count_y, count_x)
s = count_x * count_y
nodes1 = (idx, idx[:-1, :]+s, idx+s, idx[:-1, :]+2*s, idx+2*s, idx[:, :-1]+3*s)
nodes2 = (idx+s, idx[1:, :], idx+2*s, idx[1:, :]+3*s, idx+3*s, idx[:, 1:])
nodes1 = np.concatenate(tuple([n1.flatten() for n1 in nodes1])).ravel()
nodes2 = np.concatenate(tuple([n2.flatten() for n2 in nodes2])).ravel()
# remove_node_ids = [0, idx[0, -1] + 3 * s]
# self.remove_nodes(remove_node_ids)
super().__init__(nodes_x * x_scale, nodes_y * y_scale, nodes1, nodes2)
[docs]class EmbeddedTriangularGraph(EmbeddedGraph):
"""
Generate Embedded graph for a triangular lattice.
Parameters
----------
count_x, count_y : int scalar
The amount of times the unit cell (with shape / and has 2 nodes) is
repeated in the x- and y-direction; generating a triangular lattice.
x_scale, y_scale=1.0 : float scalar
Stretch factors for triangular lattice in x- and y-direction respectively.
Bottom-left node is at origin.
"""
def __init__(self, count_x, count_y, x_scale=1.0, y_scale=1.0):
y, x = np.mgrid[0:count_y, 0:count_x]
x1, y1 = x, np.sqrt(3.0) * y
nodes_x = np.concatenate((x1, x1 + 0.5), axis=0)
nodes_y = np.concatenate((y1, y1 + np.sqrt(0.75)), axis=0)
idx = np.arange(count_x * count_y).reshape(count_y, count_x)
s = count_x * count_y
nodes1 = (idx, idx[:, :-1], idx[:-1, :]+s, idx[:-1, :-1]+s, idx[:, :-1]+s, idx[:, 1:])
nodes2 = (idx+s, idx[:, 1:], idx[1:, :], idx[1:, 1:], idx[:, 1:]+s, idx[:, :-1]+s)
nodes1 = np.concatenate(tuple([n1.flatten() for n1 in nodes1]))
nodes2 = np.concatenate(tuple([n2.flatten() for n2 in nodes2]))
super().__init__(nodes_x * x_scale, nodes_y * y_scale, nodes1, nodes2)