diff --git a/stream/cost_model/cost_model.py b/stream/cost_model/cost_model.py index a7af469..428910a 100644 --- a/stream/cost_model/cost_model.py +++ b/stream/cost_model/cost_model.py @@ -1,6 +1,6 @@ from zigzag.datatypes import LayerOperand -from stream.cost_model.scheduler import schedule_graph +from stream.cost_model.scheduler import Schedule from stream.hardware.architecture.accelerator import Accelerator from stream.visualization.memory_usage import plot_memory_usage from stream.visualization.schedule import plot_timeline_brokenaxes @@ -49,22 +49,24 @@ def run(self): manager. This assumes each node in the graph has an energy and runtime of the core to which they are allocated to. """ - results = schedule_graph( - self.workload, - self.accelerator, - operands_to_prefetch=self.operands_to_prefetch, + schedule = Schedule( + G=self.workload, + accelerator=self.accelerator, scheduling_order=self.scheduling_order, + operands_to_prefetch=self.operands_to_prefetch, ) - self.latency = results[0] - self.total_cn_onchip_energy = results[1] - self.total_cn_offchip_link_energy = results[2] - self.total_cn_offchip_memory_energy = results[3] - self.total_eviction_to_offchip_link_energy = results[4] - self.total_eviction_to_offchip_memory_energy = results[5] - self.total_sink_layer_output_offchip_link_energy = results[6] - self.total_sink_layer_output_offchip_memory_energy = results[7] - self.total_core_to_core_link_energy = results[8] - self.total_core_to_core_memory_energy = results[9] + schedule.run() + + self.latency = schedule.latency + self.total_cn_onchip_energy = schedule.total_cn_onchip_energy + self.total_cn_offchip_link_energy = schedule.total_cn_offchip_link_energy + self.total_cn_offchip_memory_energy = schedule.total_cn_offchip_memory_energy + self.total_eviction_to_offchip_link_energy = schedule.total_eviction_to_offchip_link_energy + self.total_eviction_to_offchip_memory_energy = schedule.total_eviction_to_offchip_memory_energy + self.total_sink_layer_output_offchip_link_energy = schedule.total_sink_layer_output_offchip_link_energy + self.total_sink_layer_output_offchip_memory_energy = schedule.total_sink_layer_output_offchip_memory_energy + self.total_core_to_core_link_energy = schedule.total_core_to_core_link_energy + self.total_core_to_core_memory_energy = schedule.total_core_to_core_memory_energy self.energy = ( self.total_cn_onchip_energy diff --git a/stream/cost_model/scheduler.py b/stream/cost_model/scheduler.py index e82f25e..32917a7 100644 --- a/stream/cost_model/scheduler.py +++ b/stream/cost_model/scheduler.py @@ -15,520 +15,524 @@ logger = logging.getLogger(__name__) -def initialize_priorities(workload: ComputationNodeWorkload, accelerator: "Accelerator"): - for n in workload.node_list: - for tensor in n.operand_tensors.values(): - tensor.initialize_instance_priorities(workload, n, accelerator) - - -def initialize_offchip_tensors(workload: ComputationNodeWorkload, accelerator: "Accelerator"): - offchip_core_id = accelerator.offchip_core_id - assert offchip_core_id is not None, "No offchip core found for this accelerator" - offchip_core = accelerator.get_core(offchip_core_id) - offchip_top_instances = accelerator.get_top_instances_of_core(offchip_core_id) - for n in workload.node_list: - for op, tensor in n.operand_tensors.items(): - # For constant operands or inputs of first node - if op in n.constant_operands or (op != Constants.OUTPUT_LAYER_OP and len(workload.in_edges(n)) == 0): - if not any( - ( - accelerator.contains_tensor(tensor, offchip_top_instance) - for offchip_top_instance in offchip_top_instances - ) - ): - memory_op = n.memory_operand_links.layer_to_mem_op(op) - accelerator.spawn( - tensor=tensor, - core=offchip_core, - memory_op=memory_op, - initial_timestep=0, - available_timestep=0, - ) - - -def prefetch_constant_operands( - G: ComputationNodeWorkload, accelerator: "Accelerator", operands_to_prefetch: list[LayerOperand] -): - total_cn_offchip_link_energy = 0 - total_cn_offchip_memory_energy = 0 - total_eviction_to_offchip_link_energy = 0 - total_eviction_to_offchip_memory_energy = 0 - for n in G.node_list: - for op, tensor in n.operand_tensors.items(): - if op in n.constant_operands and op in operands_to_prefetch: - core_allocation = n.chosen_core_allocation - assert core_allocation is not None, "Core should be allocated" - memory_op = n.memory_operand_links.layer_to_mem_op(op) - if not accelerator.contains_tensor(tensor, core_allocation): - ( - _, - transfer_link_energy_cost, - transfer_memory_energy_cost, - eviction_link_energy_cost, - eviction_memory_energy_cost, - came_from_offchip, - ) = accelerator.transfer_tensor_to_core(tensor, core_allocation, memory_op, []) - assert came_from_offchip - total_cn_offchip_link_energy += transfer_link_energy_cost - total_cn_offchip_memory_energy += transfer_memory_energy_cost - total_eviction_to_offchip_link_energy += eviction_link_energy_cost - total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost - return ( - total_cn_offchip_link_energy, - total_cn_offchip_memory_energy, - total_eviction_to_offchip_link_energy, - total_eviction_to_offchip_memory_energy, - ) - - -def get_best_candidate( - candidates: list[tuple[int, ComputationNode]], scheduling_order: list[tuple[int, int]] -) -> tuple[ComputationNode, int]: - # If this core doesn't have any candidates, continue to the next core - if not candidates: - raise ValueError("There are no candidates to schedule.") - preds_ends, cn_candidates = zip(*candidates) - cn_candidates: list[ComputationNode] - idxs = [scheduling_order.index((n.id, n.sub_id)) for n in cn_candidates] - best_candidate_idx = idxs.index(min(idxs)) - best_candidate = cn_candidates[best_candidate_idx] - preds_end = preds_ends[best_candidate_idx] - # Remove the candidate from the list of candidates - del candidates[best_candidate_idx] - return best_candidate, preds_end - - -def get_tensors_needed_for_node(node: ComputationNode, G: ComputationNodeWorkload): - """Determine all the tensors needed to compute a node. - The node might need multiple outputs from previous nodes, depending on the graph. - - Args: - node (ComputationNode): The node to be computed. - G : The graph of all nodes. - - Returns: - tuple: A tuple of tensors and a tuple of memory operands for the node. - """ - tensors_this_candidate_needs: list[Tensor] = [] - tensors_operands: list[MemoryOperand] = [] - # Constant operands - for layer_op in node.constant_operands: - memory_op = node.memory_operand_links.layer_to_mem_op(layer_op) - if memory_op in node.too_large_operands: - continue - tensors_this_candidate_needs.append(node.operand_tensors[layer_op]) - tensors_operands.append(memory_op) - # Non-constant operands - for pred, node, edge_data in sorted(G.in_edges(node, data=True), key=itemgetter(0)): - if pred.id == node.id: - continue # Skip if predecessor was from the same layer (intra-edge) - consumer_layer_op: LayerOperand = edge_data["operand"] - consumer_memory_op = node.memory_operand_links.layer_to_mem_op(consumer_layer_op) - if consumer_memory_op in node.too_large_operands: - continue # Skip if tensor will be fetched fromm offchip throughout computation - pred_output_tensor = pred.operand_tensors[pred.output_operand] - tensors_this_candidate_needs.append(pred_output_tensor) - tensors_operands.append(consumer_memory_op) - if tensors_this_candidate_needs: - # Sort these tensors based on their earliest possible transfer time - tensors_this_candidate_needs, tensors_operands = zip( - *sorted(zip(tensors_this_candidate_needs, tensors_operands)) - ) - return tensors_this_candidate_needs, tensors_operands - - -def clear_memories( - accelerator: "Accelerator", - core: Core, - memory_operands: list[MemoryOperand], - timestep: int, - exceptions: list[Tensor] = [], - transfer_bandwidth_fraction: float = 1, -): - total_eviction_to_offchip_link_energy = 0 - total_eviction_to_offchip_memory_energy = 0 - for too_large_operand in memory_operands: - ( - timestep, - eviction_link_energy_cost, - eviction_memory_energy_cost, - ) = accelerator.remove_all( - core=core, - memory_operand=too_large_operand, - timestep=timestep, - exceptions=exceptions, - transfer_bandwidth_fraction=transfer_bandwidth_fraction, - write_back_to_offchip=True, - ) - total_eviction_to_offchip_link_energy += eviction_link_energy_cost - total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost - return ( - total_eviction_to_offchip_link_energy, - total_eviction_to_offchip_memory_energy, - timestep, - ) - - -def decrease_priority( - tensors: list[Tensor], - tensors_operands: list[MemoryOperand], - accelerator: "Accelerator", - node: ComputationNode, -): - for tensor_used_by_node, tensor_memory_operand in zip(tensors, tensors_operands): - # TODO: tensor_memory_operand will be 'O' for activation tensors. - # TODO: If the memory between input and output is not shared, this will give a wrong instance. - assert node.chosen_core_allocation is not None - top_instance = accelerator.get_top_instance_of_core(node.chosen_core_allocation, tensor_memory_operand) - tensor_used_by_node.instance_priorities[top_instance] -= 1 - - -def check_for_removal( - tensors: list[Tensor], - accelerator: "Accelerator", - G: ComputationNodeWorkload, - timestep: int, - transfer_bandwidth_fraction: float = 1, -): - offchip_core_id = accelerator.offchip_core_id - for tensor_used_by_node in tensors: - if tensor_used_by_node.get_total_priority() == 0: - instances_storing_tensor, _ = accelerator.memory_manager.find_tensor_in_top_instances(tensor_used_by_node) - for instance_storing_tensor in instances_storing_tensor: - core_ids_of_instance = [ - core.id for core in accelerator.memory_manager.cores_per_top_instance[instance_storing_tensor] - ] - # If this tensor is an output tensor, find all nodes that needed it - # to get an accurate timestep at which it can be removed - timestep_for_removal = timestep - if tensor_used_by_node.layer_operand == tensor_used_by_node.origin.output_operand: - origin = tensor_used_by_node.origin - if offchip_core_id in core_ids_of_instance: - # If wanting to discard it from offchip, look at the max end time across all successors - nodes_that_needed_tensor = [n for n in G.successors(origin) if n.id != origin.id] - else: - # If discarding it from a regular core, look at the max end time successors that used it from - # that instance - nodes_that_needed_tensor = [ - n - for n in G.successors(origin) - if n.chosen_core_allocation in core_ids_of_instance and n.id != origin.id - ] - end_times = [n.end for n in nodes_that_needed_tensor if n.end is not None] - max_end_time = max(end_times, default=timestep_for_removal) - # assert max_end_time != -1, "There should be at least one successor." - timestep_for_removal = max_end_time - - # Get a core tied to the top_instance we want to remove it on. - core = accelerator.memory_manager.cores_per_top_instance[instance_storing_tensor][0] - accelerator.remove( - tensor_used_by_node, - core, - tensor_used_by_node.memory_operand, - timestep_for_removal, - transfer_bandwidth_fraction=transfer_bandwidth_fraction, +class Schedule: + def __init__( + self, + G: ComputationNodeWorkload, + accelerator: "Accelerator", + scheduling_order: list[tuple[int, int]], + cores_idle_from: dict[int, int] | None = None, + operands_to_prefetch: list[LayerOperand] = [], + ): + """ + Args: + G: Graph containing the nodes to be scheduled. + accelerator: The accelerator to schedule the nodes on. + scheduling_order: + cores_idle_from: A dict containing for each core_id its start offset. Defaults to None. + operands_to_prefetch: The layer operands that should be prefetched at the start of the schedule. + """ + self.G = G + self.accelerator = accelerator + self.scheduling_order = scheduling_order + self.operands_to_prefetch = operands_to_prefetch + + core_ids = set(n.chosen_core_allocation for n in G.node_list) + assert None not in core_ids, "Not all nodes have core allocation. Insert SetFixedAllocationPerformanceStage." + self.all_core_ids: list[int] = sorted(list(core_ids)) # type: ignore + self.cores_idle_from = cores_idle_from if cores_idle_from else {core_id: 0 for core_id in self.all_core_ids} + + # Initialize the schedule results + self.latency = 0 + self.total_cn_onchip_energy = 0 + self.total_cn_offchip_link_energy = 0 + self.total_cn_offchip_memory_energy = 0 + self.total_eviction_to_offchip_link_energy = 0 + self.total_eviction_to_offchip_memory_energy = 0 + self.total_sink_layer_output_offchip_link_energy = 0 + self.total_sink_layer_output_offchip_memory_energy = 0 + self.total_core_to_core_link_energy = 0 + self.total_core_to_core_memory_energy = 0 + + # Remains constant throughout the scheduling + self.sink_layer_nodes = self.get_sink_layer_nodes() + self.offchip_core = accelerator.get_offchip_core() + self.nb_graph_nodes = G.number_of_nodes() + + # Initialize bookkeeping + self.nb_scheduled_nodes = 0 + self.scheduled_nodes: set[ComputationNode] = set() + self.candidates = self.get_initial_candidates() + self.initialize_tensor_priorities() + self.initialize_offchip_tensors() + + def get_initial_candidates(self): + """Put the very first nodes of a layer that doesn't have any incoming edges as the first candidates""" + candidates: list[tuple[int, ComputationNode]] = [] + for source_node in (n for n, d in self.G.in_degree() if d == 0): + core_allocation = source_node.chosen_core_allocation + candidates.append((self.cores_idle_from[core_allocation], source_node)) # type: ignore + return candidates + + def get_sink_layer_nodes(self): + """Get all the nodes with no successors that produce final outputs, used for off-loading final outputs""" + sink_layer_ids = self.G.get_sink_layer_ids() + sink_layer_nodes = set((n for n in self.G.node_list if (n.id in sink_layer_ids) and n.produces_final_output)) + return sink_layer_nodes + + def initialize_tensor_priorities(self): + """Initialize the memory instance priorities for each tensor in the workload.""" + for n in self.G.node_list: + for tensor in n.operand_tensors.values(): + tensor.initialize_instance_priorities(self.G, n, self.accelerator) + + def initialize_offchip_tensors(self): + """Add the constant operand tensors of all nodes to the off-chip initially.""" + offchip_top_instances = self.accelerator.get_top_instances_of_core(self.offchip_core) + for n in self.G.node_list: + for op, tensor in n.operand_tensors.items(): + # For constant operands or inputs of first node + if op in n.constant_operands or (op != Constants.OUTPUT_LAYER_OP and len(self.G.in_edges(n)) == 0): + if not any( + ( + self.accelerator.contains_tensor(tensor, offchip_top_instance) + for offchip_top_instance in offchip_top_instances + ) + ): + memory_op = n.memory_operand_links.layer_to_mem_op(op) + self.accelerator.spawn( + tensor=tensor, + core=self.offchip_core, + memory_op=memory_op, + initial_timestep=0, + available_timestep=0, + ) + + def run(self): + nb_scheduled_nodes = 0 + done = False + + self.prefetch_constant_operands() + + while not done: + best_candidate, preds_end = self.pop_best_candidate() + tensors_this_candidate_needs, tensors_operands = self.get_tensors_needed_for_node(best_candidate) + core = self.get_core_for_node(best_candidate) + transfer_bw_fraction = self.get_transfer_bandwidth_fraction(best_candidate) + + # Step 0: get the start time: when core is available or predecessors finished + self.sync_cores_idle_from(best_candidate) + core_idle_from = self.cores_idle_from[core.id] + timestep = max(core_idle_from, preds_end) + + # Step 1: for operands that are too large to store in the core's memory, clear the memory so ZigZag can + # optimize the loop ordering using the full memory size + transfer_complete_timestep = self.clear_memories( + core=core, + memory_operands=best_candidate.too_large_operands, + timestep=timestep, + exceptions=tensors_this_candidate_needs, + transfer_bandwidth_fraction=transfer_bw_fraction, + ) + timestep = transfer_complete_timestep + + # Step 2: Transfer the tensors needed for this node to the core (from off-chip or from another core) + for tensor, tensor_operand in zip(tensors_this_candidate_needs, tensors_operands): + transfer_complete_timestep = self.transfer_tensor_to_core( + tensor=tensor, + tensor_operand=tensor_operand, + receiving_core=core, + non_evictable_tensors=tensors_this_candidate_needs, + earliest_t=core_idle_from, + transfer_bandwidth_fraction=transfer_bw_fraction, ) + timestep = max(timestep, transfer_complete_timestep) + + # Step 3: make space for the output tensor of this node + output_layer_operand = best_candidate.output_operand + output_memory_operand = best_candidate.memory_operand_links.layer_to_mem_op(output_layer_operand) + output_tensor = best_candidate.operand_tensors[output_layer_operand] + core_to_add_output_to = ( + self.offchip_core if output_memory_operand in best_candidate.too_large_operands else core + ) + transfer_complete_timestep = self.make_space_for_tensor( + output_tensor, + core_to_add_output_to, + output_memory_operand, + timestep, + tensors_this_candidate_needs, + transfer_bandwidth_fraction=transfer_bw_fraction, + ) + timestep = transfer_complete_timestep + + # Step 4: If any operands are too large to store in memory, find a window and block off-chip links for the + # runtime duration + blocking_can_start_timestep = self.accelerator.block_offchip_links( + too_large_operands=best_candidate.too_large_operands, + core_id=core.id, + start_timestep=timestep, + duration=best_candidate.get_runtime(), + cn=best_candidate, + ) + timestep = blocking_can_start_timestep + + # Step 5: Register the scheduling decision for this node and spawn the output tensor + node_end_timestep = self.register_scheduled_node( + node=best_candidate, + start_time=timestep, + output_tensor=output_tensor, + output_memory_operand=output_memory_operand, + core_to_add_output_to=core_to_add_output_to, + core_to_run_on=core, + ) + timestep = node_end_timestep + + # Step 6: manage memory usage when the node ends + self.decrease_priority(tensors_this_candidate_needs, tensors_operands, best_candidate) + self.check_for_removal(tensors_this_candidate_needs, timestep, transfer_bw_fraction) + self.remove_sink_node_tensor( + node=best_candidate, + tensor_to_remove=output_tensor, + core_to_remove_from=core, + timestep=timestep, + transfer_bandwidth_fraction=transfer_bw_fraction, + ) - -def sync_cores_idle_from( - cores_idle_from: dict[int, int], - G: ComputationNodeWorkload, - best_candidate: ComputationNode, - scheduling_order: list[tuple[int, int]], -): - """ - Sync the cores_idle_from dict values if the best candidate is the first node of a layer and we detect layer-by-layer - execution. The layer-by-layer execution is detected through the scheduling_order. - """ - # Get the predecessor ids of the best_candidate from the workload graph G - predecessor_ids = [pred.id for pred in G.predecessors(best_candidate) if pred.id != best_candidate.id] - predecessor_idxs = [i for i in range(len(scheduling_order)) if scheduling_order[i][0] in predecessor_ids] - - best_candidate_idx = scheduling_order.index((best_candidate.id, best_candidate.sub_id)) - if scheduling_order[best_candidate_idx - 1][0] != best_candidate.id and all( - (i < best_candidate_idx for i in predecessor_idxs) + # Step 7: finish this round + self.extend_candidates(best_candidate) + nb_scheduled_nodes += 1 + done = nb_scheduled_nodes == self.nb_graph_nodes + + self.latency = self.get_total_latency() + return self.latency + + def get_transfer_bandwidth_fraction(self, node: ComputationNode): + """Get the fraction of the off-chip bandwidth to be used for the tensor transfers related to this node""" + return 1 / node.get_total_inter_core_splits() + + def prefetch_constant_operands(self): + """Load the `operands_to_prefetch` to the cores they belong to and log the energy costs.""" + for n in self.G.node_list: + for op, tensor in n.operand_tensors.items(): + if op in n.constant_operands and op in self.operands_to_prefetch: + core_allocation = n.chosen_core_allocation + assert core_allocation is not None, "Core should be allocated" + memory_op = n.memory_operand_links.layer_to_mem_op(op) + if not self.accelerator.contains_tensor(tensor, core_allocation): + ( + _, + transfer_link_energy_cost, + transfer_memory_energy_cost, + eviction_link_energy_cost, + eviction_memory_energy_cost, + came_from_offchip, + ) = self.accelerator.transfer_tensor_to_core(tensor, core_allocation, memory_op, []) + assert came_from_offchip + self.total_cn_offchip_link_energy += transfer_link_energy_cost + self.total_cn_offchip_memory_energy += transfer_memory_energy_cost + self.total_eviction_to_offchip_link_energy += eviction_link_energy_cost + self.total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost + + def pop_best_candidate(self) -> tuple[ComputationNode, int]: + """Get the best candidate node to schedule next, given the selection priority. Remove that candidate from the + list of candidates and return it.""" + if not self.candidates: + raise ValueError("There are no candidates to schedule.") + preds_ends, cn_candidates = zip(*self.candidates) + cn_candidates: list[ComputationNode] + idxs = [self.scheduling_order.index((n.id, n.sub_id)) for n in cn_candidates] + best_candidate_idx = idxs.index(min(idxs)) + best_candidate = cn_candidates[best_candidate_idx] + preds_end = preds_ends[best_candidate_idx] + # Remove the candidate from the list of candidates + del self.candidates[best_candidate_idx] + return best_candidate, preds_end + + def sync_cores_idle_from( + self, + best_candidate: ComputationNode, ): - # If the best_candidate is the first node of a layer and all nodes of predecessor layers have been scheduled - # Sync the cores_idle_from dict - max_idle_time = max(cores_idle_from.values()) - for core_id in cores_idle_from: - cores_idle_from[core_id] = max_idle_time - - -def schedule_graph( - G: ComputationNodeWorkload, - accelerator: "Accelerator", - scheduling_order: list[tuple[int, int]], - cores_idle_from: dict[int, int] | None = None, - operands_to_prefetch: list[LayerOperand] = [], -) -> tuple[int, float, float, float, float, float, float, float, float, float]: - """Schedule the nodes of graph G across the cores in the system. - Each node should have a core_allocation and runtime set. - - Args: - G : Graph containing the nodes to be scheduled. - accelerator: The accelerator to schedule the nodes on. - scheduling_order: - cores_idle_from: A dict containing for each core_id its start offset. Defaults to None. - operands_to_prefetch: The layer operands that should be prefetched at the start of the schedule. - """ - # Initialize total link energy cost and memory energy costs - total_cn_onchip_energy = 0 - total_cn_offchip_link_energy = 0 - total_cn_offchip_memory_energy = 0 - total_eviction_to_offchip_link_energy = 0 - total_eviction_to_offchip_memory_energy = 0 - total_sink_layer_output_offchip_link_energy = 0 - total_sink_layer_output_offchip_memory_energy = 0 - total_core_to_core_link_energy = 0 - total_core_to_core_memory_energy = 0 - - core_ids = set(n.chosen_core_allocation for n in G.node_list) - assert ( - None not in core_ids - ), "Make sure all nodes have a core allocation. Insert SetFixedAllocationPerformanceStage." - all_core_ids: list[int] = sorted(list(core_ids)) # type: ignore - - if cores_idle_from is None: - # Make it 0 for all cores - cores_idle_from = {core_allocation: 0 for core_allocation in all_core_ids} - - nb_graph_nodes = G.number_of_nodes() - nb_scheduled_nodes = 0 - scheduled_nodes: set[ComputationNode] = set() - - # List that keeps all possible candidate nodes for each core. - candidates: list[tuple[int, ComputationNode]] = [] - - # Put the very first nodes of a layer that doesn't have any incoming edges as the first candidates - for source_node in (n for n, d in G.in_degree() if d == 0): - core_allocation = source_node.chosen_core_allocation - candidates.append((cores_idle_from[core_allocation], source_node)) # type: ignore - - # Get all the nodes with no successors that produce final outputs, used for off-loading final outputs - sink_layer_ids = G.get_sink_layer_ids() - sink_layer_nodes = set((n for n in G.node_list if (n.id in sink_layer_ids) and n.produces_final_output)) - - # Get the offchip core id and core - offchip_core_id = accelerator.offchip_core_id - assert offchip_core_id is not None - offchip_core = accelerator.get_core(offchip_core_id) - - # Schedule preparation: - # 1. Initialize the memory instance priorities for each tensor - initialize_priorities(G, accelerator) - # 2. Add the constant operand tensors of all nodes to the off-chip initially - initialize_offchip_tensors(G, accelerator) - # 3. Prefetch the constant operands that should be prefetched to their core - ( - prefetch_cn_offchip_link_energy, - prefetch_cn_offchip_memory_energy, - prefetch_eviction_to_offchip_link_energy, - prefetch_eviction_to_offchip_memory_energy, - ) = prefetch_constant_operands(G, accelerator, operands_to_prefetch) - total_cn_offchip_link_energy += prefetch_cn_offchip_link_energy - total_cn_offchip_memory_energy += prefetch_cn_offchip_memory_energy - total_eviction_to_offchip_link_energy += prefetch_eviction_to_offchip_link_energy - total_eviction_to_offchip_memory_energy += prefetch_eviction_to_offchip_memory_energy - - done = False - while not done: - # Get the best candidate given the selection priority - best_candidate, preds_end = get_best_candidate(candidates, scheduling_order) - # Sync cores_idle_from variable for layer-by-layer scheduling - sync_cores_idle_from(cores_idle_from, G, best_candidate, scheduling_order) - # Get the core this candidate will be scheduled on - core_id = best_candidate.chosen_core_allocation + """ + Sync the cores_idle_from dict values if the best candidate is the first node of a layer and we detect + layer-by-layer execution. The layer-by-layer execution is detected through the scheduling_order. + """ + # Get the predecessor ids of the best_candidate from the workload graph G + predecessor_ids = [pred.id for pred in self.G.predecessors(best_candidate) if pred.id != best_candidate.id] + predecessor_idxs = [ + i for i in range(len(self.scheduling_order)) if self.scheduling_order[i][0] in predecessor_ids + ] + + best_candidate_idx = self.scheduling_order.index((best_candidate.id, best_candidate.sub_id)) + if self.scheduling_order[best_candidate_idx - 1][0] != best_candidate.id and all( + (i < best_candidate_idx for i in predecessor_idxs) + ): + # If the best_candidate is the first node of a layer and all nodes of predecessor layers have been scheduled + # Sync the cores_idle_from dict + max_idle_time = max(self.cores_idle_from.values()) + for core_id in self.cores_idle_from: + self.cores_idle_from[core_id] = max_idle_time + + def get_core_for_node(self, node: ComputationNode): + """Get the core this candidate will be scheduled on""" + core_id = node.chosen_core_allocation assert core_id is not None - core = accelerator.get_core(core_id) - - # Fraction of the off-chip bandwidth to be used for the tensor transfers related to this node - transfer_bandwidth_fraction = 1 / best_candidate.get_total_inter_core_splits() - - # Earliest start time is when core is available or predecessors finished - core_idle_from = cores_idle_from[core_id] - start = max(core_idle_from, preds_end) - timestep = start - - # Step 0 - tensors_this_candidate_needs, tensors_operands = get_tensors_needed_for_node(best_candidate, G) - - # Step 1 - # There could be operands that are too large to store in the highest memory on the core - # The tensors stored in these memories should be evicted and potentially written back to off-chip - # Clear these memories (this might delay the potential start time if things have to written to off-chip) - ( - clear_link_energy, - clear_memory_energy, - memory_cleared_timestep, - ) = clear_memories( - accelerator=accelerator, - core=core, - memory_operands=best_candidate.too_large_operands, - timestep=timestep, - exceptions=tensors_this_candidate_needs, - transfer_bandwidth_fraction=transfer_bandwidth_fraction, - ) - total_eviction_to_offchip_link_energy += clear_link_energy - total_eviction_to_offchip_memory_energy += clear_memory_energy - timestep = memory_cleared_timestep - - # Step 2 - # The computation might need tensors that are currently not present in the core's memories - # We need to fetch these tensors from either off-chip or from the core where they are present - # Transfer these tensors from wherever they are currently residing to this core - for tensor, tensor_operand in zip(tensors_this_candidate_needs, tensors_operands): - # Transfer the tensor + return self.accelerator.get_core(core_id) + + def get_tensors_needed_for_node(self, node: ComputationNode): + """Determine all the tensors needed to compute a node. + The node might need multiple outputs from previous nodes, depending on the graph. + + Args: + node (ComputationNode): The node to be computed. + G : The graph of all nodes. + + Returns: + A tuple of tensors and a tuple of memory operands for the node. + """ + tensors_this_candidate_needs: list[Tensor] = [] + tensors_operands: list[MemoryOperand] = [] + # Constant operands + for layer_op in node.constant_operands: + memory_op = node.memory_operand_links.layer_to_mem_op(layer_op) + if memory_op in node.too_large_operands: + continue + tensors_this_candidate_needs.append(node.operand_tensors[layer_op]) + tensors_operands.append(memory_op) + # Non-constant operands + for pred, node, edge_data in sorted(self.G.in_edges(node, data=True), key=itemgetter(0)): + if pred.id == node.id: + continue # Skip if predecessor was from the same layer (intra-edge) + consumer_layer_op: LayerOperand = edge_data["operand"] + consumer_memory_op = node.memory_operand_links.layer_to_mem_op(consumer_layer_op) + if consumer_memory_op in node.too_large_operands: + continue # Skip if tensor will be fetched fromm offchip throughout computation + pred_output_tensor = pred.operand_tensors[pred.output_operand] + tensors_this_candidate_needs.append(pred_output_tensor) + tensors_operands.append(consumer_memory_op) + if tensors_this_candidate_needs: + # Sort these tensors based on their earliest possible transfer time + tensors_this_candidate_needs, tensors_operands = zip( + *sorted(zip(tensors_this_candidate_needs, tensors_operands)) + ) + return tensors_this_candidate_needs, tensors_operands + + def clear_memories( + self, + core: Core, + memory_operands: list[MemoryOperand], + timestep: int, + exceptions: list[Tensor] = [], + transfer_bandwidth_fraction: float = 1, + ): + for too_large_operand in memory_operands: ( - transfer_complete_timestep, - transfer_link_energy_cost, - transfer_memory_energy_cost, + timestep, eviction_link_energy_cost, eviction_memory_energy_cost, - came_from_offchip, - ) = accelerator.transfer_tensor_to_core( - tensor, - core_id, - tensor_operand, - tensors_this_candidate_needs, - earliest_t=core_idle_from, + ) = self.accelerator.remove_all( + core=core, + memory_operand=too_large_operand, + timestep=timestep, + exceptions=exceptions, transfer_bandwidth_fraction=transfer_bandwidth_fraction, + write_back_to_offchip=True, ) - # Update the possible start time of this node - timestep = max(timestep, transfer_complete_timestep) - # Add the energy costs to their respective trackers - if came_from_offchip: - total_cn_offchip_link_energy += transfer_link_energy_cost - total_cn_offchip_memory_energy += transfer_memory_energy_cost - else: - total_core_to_core_link_energy += transfer_link_energy_cost - total_core_to_core_memory_energy += transfer_memory_energy_cost - total_eviction_to_offchip_link_energy += eviction_link_energy_cost - total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost - - # Step 3 - # Make space for the output tensor of this computation node and spawn it when evictions are complete - # If the output operand is in the too large operands, add it to off-chip, otherwise add it to this core's - # output memory - output_layer_operand = best_candidate.output_operand - output_memory_operand = best_candidate.memory_operand_links.layer_to_mem_op(output_layer_operand) - output_tensor = best_candidate.operand_tensors[output_layer_operand] - core_to_add_output_to = offchip_core if output_memory_operand in best_candidate.too_large_operands else core + self.total_eviction_to_offchip_link_energy += eviction_link_energy_cost + self.total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost + return timestep + + def transfer_tensor_to_core( + self, + tensor: Tensor, + tensor_operand: MemoryOperand, + receiving_core: Core, + non_evictable_tensors: list[Tensor], + sending_core_id: int | None = None, + earliest_t: int = 0, + transfer_bandwidth_fraction: float = 1, + ): ( - evictions_complete_timestep, + transfer_complete_timestep, + transfer_link_energy_cost, + transfer_memory_energy_cost, eviction_link_energy_cost, eviction_memory_energy_cost, - ) = accelerator.make_space_for( - output_tensor, - core_to_add_output_to, - output_memory_operand, - timestep, - tensors_this_candidate_needs, + came_from_offchip, + ) = self.accelerator.transfer_tensor_to_core( + tensor, + receiving_core.id, + tensor_operand, + non_evictable_tensors, + earliest_t=earliest_t, transfer_bandwidth_fraction=transfer_bandwidth_fraction, ) - total_eviction_to_offchip_link_energy += eviction_link_energy_cost - total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost - timestep = evictions_complete_timestep - - # Step 4 - # Check if we had any operands that were too large to store in the core's memory, block the relevant off-chip - # link for the duration - # This might again delay the execution if the offchip link was already blocked by another core - blocking_can_start_timestep = accelerator.block_offchip_links( - too_large_operands=best_candidate.too_large_operands, - core_id=core_id, - start_timestep=timestep, - duration=best_candidate.get_runtime(), - cn=best_candidate, + + # Add the energy costs to their respective trackers + if came_from_offchip: + self.total_cn_offchip_link_energy += transfer_link_energy_cost + self.total_cn_offchip_memory_energy += transfer_memory_energy_cost + else: + self.total_core_to_core_link_energy += transfer_link_energy_cost + self.total_core_to_core_memory_energy += transfer_memory_energy_cost + self.total_eviction_to_offchip_link_energy += eviction_link_energy_cost + self.total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost + return transfer_complete_timestep + + def make_space_for_tensor( + self, + tensor: Tensor, + core: Core, + memory_operand: MemoryOperand, + timestep: int, + tensors_to_avoid_evicting: list[Tensor] = [], + transfer_bandwidth_fraction: float = 1, + ): + ( + evictions_complete_timestep, + eviction_link_energy_cost, + eviction_memory_energy_cost, + ) = self.accelerator.make_space_for( + tensor=tensor, + core=core, + memory_op=memory_operand, + timestep=timestep, + tensors_to_avoid_evicting=tensors_to_avoid_evicting, + transfer_bandwidth_fraction=transfer_bandwidth_fraction, ) - timestep = blocking_can_start_timestep + self.total_eviction_to_offchip_link_energy += eviction_link_energy_cost + self.total_eviction_to_offchip_memory_energy += eviction_memory_energy_cost + return evictions_complete_timestep + + def register_scheduled_node( + self, + node: ComputationNode, + start_time: int, + output_tensor: Tensor, + output_memory_operand: MemoryOperand, + core_to_add_output_to: Core, + core_to_run_on: Core, + ): + """Spawn the output tensor and register the runtimes and energies of the node.""" - # Step 5 - # Spawn the output tensor and update the start and end time of the node - start = timestep - end = start + best_candidate.get_runtime() - accelerator.spawn( + end_time = start_time + node.get_runtime() + self.accelerator.spawn( output_tensor, core_to_add_output_to, output_memory_operand, - initial_timestep=start, - available_timestep=end, - ) - best_candidate.set_start(start) - best_candidate.set_end(end) - cores_idle_from[core_id] = end - - # Add the computation energy of running this node - total_cn_onchip_energy += best_candidate.get_onchip_energy() - total_cn_offchip_memory_energy += best_candidate.get_offchip_energy() - - # Add this node to the scheduled nodes - scheduled_nodes.add(best_candidate) - - # Step 6 - # Memory usage: When the node ends: - # Decrease the priority of all the tensors this node used - decrease_priority(tensors_this_candidate_needs, tensors_operands, accelerator, best_candidate) - # Remove the tensor if the priority is zero - check_for_removal( - tensors=tensors_this_candidate_needs, - accelerator=accelerator, - G=G, - timestep=end, - transfer_bandwidth_fraction=transfer_bandwidth_fraction, + initial_timestep=start_time, + available_timestep=end_time, ) - - # Step 7 - # Memory usage: When the node ends: - # If this node is a sink node (node that has no successors and that produces a final output), transfer final - # outputs to offchip - if best_candidate in sink_layer_nodes: + node.set_start(start_time) + node.set_end(end_time) + self.cores_idle_from[core_to_run_on.id] = end_time + self.scheduled_nodes.add(node) + + self.total_cn_onchip_energy += node.get_onchip_energy() + self.total_cn_offchip_memory_energy += node.get_offchip_energy() + return end_time + + def remove_sink_node_tensor( + self, + node: ComputationNode, + tensor_to_remove: Tensor, + core_to_remove_from: Core, + timestep: int, + transfer_bandwidth_fraction: float, + ): + """If this node is a sink node (node that has no successors and that produces a final output), transfer final + outputs to offchip + """ + if node in self.sink_layer_nodes: # Only push back sink node outputs if they're generated and stored on the core - if Constants.OUTPUT_MEM_OP not in best_candidate.too_large_operands: + if Constants.OUTPUT_MEM_OP not in node.too_large_operands: ( _, link_energy_cost, memory_energy_cost, - ) = accelerator.remove( - tensor=output_tensor, - core=core, - memory_op=output_tensor.memory_operand, - timestep=end, + ) = self.accelerator.remove( + tensor=tensor_to_remove, + core=core_to_remove_from, + memory_op=tensor_to_remove.memory_operand, + timestep=timestep, transfer_bandwidth_fraction=transfer_bandwidth_fraction, write_back_to_offchip=True, ) - total_sink_layer_output_offchip_link_energy += link_energy_cost - total_sink_layer_output_offchip_memory_energy += memory_energy_cost + self.total_sink_layer_output_offchip_link_energy += link_energy_cost + self.total_sink_layer_output_offchip_memory_energy += memory_energy_cost + + def decrease_priority( + self, + tensors: list[Tensor], + tensors_operands: list[MemoryOperand], + node: ComputationNode, + ): + for tensor_used_by_node, tensor_memory_operand in zip(tensors, tensors_operands): + # TODO: tensor_memory_operand will be 'O' for activation tensors. + # TODO: If the memory between input and output is not shared, this will give a wrong instance. + assert node.chosen_core_allocation is not None + top_instance = self.accelerator.get_top_instance_of_core(node.chosen_core_allocation, tensor_memory_operand) + tensor_used_by_node.instance_priorities[top_instance] -= 1 + + def check_for_removal( + self, + tensors: list[Tensor], + timestep: int, + transfer_bandwidth_fraction: float = 1, + ): + """Remove the tensor from the core if its priority is zero.""" + for tensor_used_by_node in tensors: + if tensor_used_by_node.get_total_priority() == 0: + instances_storing_tensor, _ = self.accelerator.memory_manager.find_tensor_in_top_instances( + tensor_used_by_node + ) + for instance_storing_tensor in instances_storing_tensor: + core_ids_of_instance = [ + core.id + for core in self.accelerator.memory_manager.cores_per_top_instance[instance_storing_tensor] + ] + # If this tensor is an output tensor, find all nodes that needed it + # to get an accurate timestep at which it can be removed + timestep_for_removal = timestep + if tensor_used_by_node.layer_operand == tensor_used_by_node.origin.output_operand: + origin = tensor_used_by_node.origin + if self.offchip_core.id in core_ids_of_instance: + # If wanting to discard it from offchip, look at the max end time across all successors + nodes_that_needed_tensor = [n for n in self.G.successors(origin) if n.id != origin.id] + else: + # If discarding it from a regular core, look at the max end time successors that used it from + # that instance + nodes_that_needed_tensor = [ + n + for n in self.G.successors(origin) + if n.chosen_core_allocation in core_ids_of_instance and n.id != origin.id + ] + end_times = [n.end for n in nodes_that_needed_tensor if n.end is not None] + max_end_time = max(end_times, default=timestep_for_removal) + # assert max_end_time != -1, "There should be at least one successor." + timestep_for_removal = max_end_time + + # Get a core tied to the top_instance we want to remove it on. + core = self.accelerator.memory_manager.cores_per_top_instance[instance_storing_tensor][0] + self.accelerator.remove( + tensor_used_by_node, + core, + tensor_used_by_node.memory_operand, + timestep_for_removal, + transfer_bandwidth_fraction=transfer_bandwidth_fraction, + ) - # Step 8 - # For each successor of this node, check if all of its predecessors have been scheduled - for successor in sorted(G.successors(best_candidate)): - if all((pred in scheduled_nodes for pred in G.predecessors(successor))): + def extend_candidates(self, node: ComputationNode): + """For each successor of this node, check if all of its predecessors have been scheduled""" + for successor in sorted(self.G.successors(node)): + if all((pred in self.scheduled_nodes for pred in self.G.predecessors(successor))): preds_end = max( - (predecessor.end for predecessor in G.predecessors(successor)), + (predecessor.end for predecessor in self.G.predecessors(successor)), default=0, ) - candidates.append((preds_end, successor)) - - # Increment the number of scheduled nodes - nb_scheduled_nodes += 1 - done = nb_scheduled_nodes == nb_graph_nodes - - # Step 9 - # The total schedule latency is the max of all CN end times and the link end times - cns_end_time = max((n.end for n in G.node_list)) - links_end_time = max([event.end for event in accelerator.communication_manager.events], default=0) - latency = max(cns_end_time, links_end_time) - - return ( - latency, - total_cn_onchip_energy, - total_cn_offchip_link_energy, - total_cn_offchip_memory_energy, - total_eviction_to_offchip_link_energy, - total_eviction_to_offchip_memory_energy, - total_sink_layer_output_offchip_link_energy, - total_sink_layer_output_offchip_memory_energy, - total_core_to_core_link_energy, - total_core_to_core_memory_energy, - ) + self.candidates.append((preds_end, successor)) + + def get_total_latency(self): + """The total schedule latency is the max of all CN end times and the link end times""" + cns_end_time = max((n.end for n in self.G.node_list)) + links_end_time = max([event.end for event in self.accelerator.communication_manager.events], default=0) + return max(cns_end_time, links_end_time) diff --git a/stream/hardware/architecture/accelerator.py b/stream/hardware/architecture/accelerator.py index 681e960..c95f28d 100644 --- a/stream/hardware/architecture/accelerator.py +++ b/stream/hardware/architecture/accelerator.py @@ -59,6 +59,11 @@ def get_core(self, core_id: int) -> Core: """ return self.cores.get_node_with_id(core_id) + def get_offchip_core(self) -> Core: + """Return the offchip core.""" + assert self.offchip_core_id, "This accelerator has no offchip core id." + return self.get_core(self.offchip_core_id) + @property def core_list(self) -> list[Core]: return list(self.cores.node_list) @@ -509,8 +514,9 @@ def has_shared_memory(self, core_id_a: int, core_id_b: int, mem_op_a: MemoryOper ) return top_memory_instance_a is top_memory_instance_b - def get_top_instances_of_core(self, core_id: int): - core = self.get_core(core_id) + def get_top_instances_of_core(self, core: int | Core) -> list[MemoryInstance]: + if isinstance(core, int): + core = self.get_core(core) top_instances = self.memory_manager.top_instances_per_core[core] return top_instances