-
Notifications
You must be signed in to change notification settings - Fork 1
/
circles_graph.py
84 lines (68 loc) · 2.84 KB
/
circles_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""
Produce graph for circles around a focus person.
"""
import argparse
from collections.abc import Collection
from pathlib import Path
import networkx as nx
import circles_tools
import data_reader
from data_reader import UserNum
import graph_tools
import utils
NodeType = str
EdgeType = tuple[NodeType, NodeType]
def make_bipartite(db : data_reader.Database, people : Collection[UserNum]
) -> tuple[list[NodeType], list[EdgeType]]:
# Like graph_make_bipartite ... but only for a subset of people.
people_nodes : frozenset[NodeType] = frozenset(str(p) for p in people)
union_nodes : set[NodeType] = set()
edges : set[EdgeType] = set()
for person in people:
# Add union node for parents if they are known.
parents = db.parents_of(person)
if parents:
union = "Union/" + "/".join(str(p) for p in sorted(parents))
union_nodes.add(union)
edges.add((str(person), union))
# Make sure parents are also connected to the union.
for parent in parents:
if parent in people_nodes:
edges.add((str(parent), union))
# Add union node for all "partners" (spouses / coparents).
for partner in db.partners_of(person):
if partner in people_nodes:
union = "Union/" + "/".join(str(p) for p in sorted([person, partner]))
union_nodes.add(union)
edges.add((str(person), union))
edges.add((str(partner), union))
# Note: We don't explicitly connect all children here.
# If they are in `people`, they will be connected above.
return (sorted(people_nodes) + sorted(union_nodes),
sorted(edges))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("focus_id")
parser.add_argument("--num-circles", "-n", type=int, default=7)
parser.add_argument("--version", help="Data version (defaults to most recent).")
args = parser.parse_args()
utils.log("Starting")
db = data_reader.Database(args.version)
circles = circles_tools.load_circles(db, args.focus_id, args.num_circles)
# Flat to set of people in all circles.
people = frozenset(x for ring in circles for x in ring)
utils.log(f"Loaded {args.num_circles} circles around {args.focus_id}: {len(people):_} people total")
nodes, edges = make_bipartite(db, people)
graph = graph_tools.make_graph(nodes, edges)
utils.log(f"Created graph with {graph.number_of_nodes():_} nodes and {graph.number_of_edges():_} edges")
graph_file = Path("results", "circles", "graph", f"{args.focus_id}.{args.num_circles}")
graph_file.parent.mkdir(parents=True, exist_ok=True)
filename = graph_tools.write_graph(graph, graph_file)
utils.log(f"Wrote: {str(filename)}")
# print("Cycles:")
# for i, cycle in enumerate(nx.simple_cycles(graph)):
# print(f"Cycle {i:4d} {len(cycle):4d}")
# for x in cycle:
# if isinstance(x, int):
# print(" *", db.num2id(x))
main()