Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove cycle handling #8679

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 65 additions & 46 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,61 +936,80 @@ def _distribute_output( # pylint: disable=too-many-positional-arguments

def _find_next_runnable_component(
self, components_inputs: Dict[str, Dict[str, Any]], waiting_queue: List[Tuple[str, Component]]
) -> Tuple[str, Component]:
) -> Optional[Tuple[str, Component]]:
"""
Finds the next Component that can be run and returns it.

:param components_inputs: The current state of the inputs divided by Component name
:param waiting_queue: Queue of Components waiting for input

:returns: The name and the instance of the next Component that can be run
:returns: None or the name and the instance of the next Component that can be run
"""
all_lazy_variadic = True
all_with_default_inputs = True

filtered_waiting_queue = []

for name, comp in waiting_queue:
if not _is_lazy_variadic(comp):
# Components with variadic inputs that are not greedy must be removed only if there's nothing else to
# run at this stage.
# We need to wait as long as possible to run them, so we can collect as most inputs as we can.
all_lazy_variadic = False

if not _has_all_inputs_with_defaults(comp):
# Components that have defaults for all their inputs must be treated the same identical way as we treat
# lazy variadic components. If there are only components with defaults we can run them.
# If we don't do this the order of execution of the Pipeline's Components will be affected cause we
# enqueue the Components in `run_queue` at the start using the order they are added in the Pipeline.
# If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
# logic A must be executed after B. However, B could run before A if we don't do this check.
all_with_default_inputs = False

if not _is_lazy_variadic(comp) and not _has_all_inputs_with_defaults(comp):
# Keep track of the Components that are not lazy variadic and don't have all inputs with defaults.
# We'll handle these later if necessary.
filtered_waiting_queue.append((name, comp))

# If all Components are lazy variadic or all Components have all inputs with defaults we can get one to run
if all_lazy_variadic or all_with_default_inputs:
return waiting_queue[0]

for name, comp in filtered_waiting_queue:
# Find the first component that has all the inputs it needs to run
has_enough_inputs = True
for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore
if input_socket.name not in components_inputs.get(name, {}) and input_socket.is_mandatory:
has_enough_inputs = False
break

if has_enough_inputs:
return name, comp
def count_inputs(name, comp):
# Count the number of inputs that are provided
# Returns None if a mandatory input is missing, an integer otherwise
# This can ensure that components with only default inputs are not run
num_inputs = 0
for input_socket in comp.__haystack_input__._sockets_dict.values():
if (
input_socket.name not in components_inputs.get(name, {})
and input_socket.is_mandatory
and not input_socket.is_variadic
):
return None
if input_socket.name in components_inputs.get(name, {}):
num_inputs += 1
return num_inputs

queue = [(name, comp, _is_lazy_variadic(comp), count_inputs(name, comp)) for name, comp in waiting_queue]

first_runnable = next(
(
(name, comp)
for (name, comp, lazy_variadic, num_inputs) in queue
if not lazy_variadic and num_inputs is not None and num_inputs > 0
),
None,
)
if first_runnable:
return first_runnable

first_lazy_variadic = next(
(
(name, comp)
for (name, comp, lazy_variadic, num_inputs) in queue
if lazy_variadic and num_inputs is not None and num_inputs > 0
),
None,
)
if first_lazy_variadic:
return first_lazy_variadic

# Return components that get no input at all but have default arguments
first_default_lazy_variadic = next(
(
(name, comp)
for (name, comp, lazy_variadic, num_inputs) in queue
if lazy_variadic and num_inputs is not None
),
None,
)
if first_default_lazy_variadic:
return first_default_lazy_variadic

first_default = next(
(
(name, comp)
for (name, comp, lazy_variadic, num_inputs) in queue
if not lazy_variadic and num_inputs is not None
),
None,
)
if first_default:
return first_default

# If we reach this point it means that we found no Component that has enough inputs to run.
# Ideally we should never reach this point, though we can't raise an exception either as
# existing use cases rely on this behavior.
# So we return the last Component, that could be the last from waiting_queue or filtered_waiting_queue.
return name, comp
return None

def _find_next_runnable_lazy_variadic_or_default_component(
self, waiting_queue: List[Tuple[str, Component]]
Expand Down
Loading
Loading