Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow provider not working properly with Ray 1.4 #27

Open
isaac-vidas opened this issue Jun 14, 2021 · 2 comments
Open

Airflow provider not working properly with Ray 1.4 #27

isaac-vidas opened this issue Jun 14, 2021 · 2 comments

Comments

@isaac-vidas
Copy link

After upgrading to Ray 1.4 (both in the airflow image and the cluster), some capabilities stopped working.
For example, in the xgboost_pandas_breast_cancer_tune.py, the split_train_test step fails with the following error:

*** Reading local file: /usr/local/airflow/logs/xgboost_pandas_breast_cancer_tune/split_train_test/2021-06-13T21:56:24.701279+00:00/1.log
[2021-06-13 21:56:35,118] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T21:56:24.701279+00:00 [queued]>
[2021-06-13 21:56:35,142] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T21:56:24.701279+00:00 [queued]>
[2021-06-13 21:56:35,142] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-06-13 21:56:35,142] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-06-13 21:56:35,142] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-06-13 21:56:35,151] {taskinstance.py:1089} INFO - Executing <Task(_PythonDecoratedOperator): split_train_test> on 2021-06-13T21:56:24.701279+00:00
[2021-06-13 21:56:35,157] {standard_task_runner.py:52} INFO - Started process 214 to run task
[2021-06-13 21:56:35,162] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xgboost_pandas_breast_cancer_tune', 'split_train_test', '2021-06-13T21:56:24.701279+00:00', '--job-id', '58', '--pool', 'ray_worker_pool', '--raw', '--subdir', 'DAGS_FOLDER/xgboost_pandas_breast_cancer_tune.py', '--cfg-path', '/tmp/tmpsrl2sg11', '--error-file', '/tmp/tmp0xsgp8gd']
[2021-06-13 21:56:35,163] {standard_task_runner.py:77} INFO - Job 58: Subtask split_train_test
[2021-06-13 21:56:35,219] {logging_mixin.py:104} INFO - Running <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T21:56:24.701279+00:00 [running]> on host eab106ed84ba
[2021-06-13 21:56:35,283] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xgboost_pandas_breast_cancer_tune
AIRFLOW_CTX_TASK_ID=split_train_test
AIRFLOW_CTX_EXECUTION_DATE=2021-06-13T21:56:24.701279+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-13T21:56:24.701279+00:00
[2021-06-13 21:56:35,285] {ray_decorators.py:15} INFO - [wrapper] Got executor.
[2021-06-13 21:56:35,300] {base.py:78} INFO - Using connection to: id: ray_cluster_connection. Host: 192.168.106.200, Port: 10001, Schema: , Login: , Password: None, extra: None
[2021-06-13 21:56:35,300] {ray_client.py:48} INFO - Connection base_url is 192.168.106.200:10001
[2021-06-13 21:56:37,233] {ray_client.py:55} INFO - New Ray Connection Established
[2021-06-13 21:56:37,239] {ray_backend.py:116} INFO - Failed to look up actor with name 'ray_kv_store'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.
[2021-06-13 21:56:37,239] {ray_backend.py:118} INFO - Creating new Actor with identifier Failed to look up actor with name 'ray_kv_store'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.
[2021-06-13 21:56:37,322] {ray_decorators.py:19} INFO - [wrapper] Launching task (with ('ObjectRef(21bbb17697b5270dffffffffffffffffffffffff0200000002000000)',), {}.
[2021-06-13 21:56:37,322] {filelock.py:274} INFO - Lock 140149722366096 acquired on /tmp/ray_kv.lock
[2021-06-13 21:56:37,330] {logging_mixin.py:104} WARNING - It looks like you're creating a detached actor in an anonymous namespace. In order to access this actor in the future, you will need to explicitly connect to this namespace with ray.init(namespace="35b665fe-8dc7-40bf-9920-834e69183299", ...)
[2021-06-13 21:56:40,341] {filelock.py:318} INFO - Lock 140149722366096 released on /tmp/ray_kv.lock
[2021-06-13 21:56:40,341] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 233, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray_provider/decorators/ray_decorators.py", line 20, in wrapper
    ret_str = executor.execute(f, args=args, kwargs=kwargs, eager=eager)
  File "/usr/local/lib/python3.7/site-packages/ray_provider/xcom/ray_backend.py", line 135, in execute
    return ray.get(res)
  File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 61, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/ray/util/client/worker.py", line 202, in get
    res = self._get(obj_ref, op_timeout)
  File "/usr/local/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in _get
    raise err
types.RayTaskError(TypeError): �[36mray::_KvStoreActor.execute()�[39m (pid=14747, ip=0.0.0.0)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor
  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray_provider/xcom/ray_backend.py", line 90, in execute
  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError(TypeError): �[36mray::status_function()�[39m (pid=14750, ip=0.0.0.0)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray_provider/xcom/ray_backend.py", line 72, in status_function
  File "/usr/local/airflow/dags/xgboost_pandas_breast_cancer_tune.py", line 90, in split_train_test
TypeError: cannot unpack non-iterable NoneType object
[2021-06-13 21:56:40,343] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=xgboost_pandas_breast_cancer_tune, task_id=split_train_test, execution_date=20210613T215624, start_date=20210613T215635, end_date=20210613T215640
[2021-06-13 21:56:40,394] {local_task_job.py:146} INFO - Task exited with return code 1
[2021-06-13 21:56:40,416] {ray_backend.py:180} ERROR - Cleaning up from Failure: {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f77266fdd90>, 'dag': <DAG: xgboost_pandas_breast_cancer_tune>, 'dag_run': <DagRun xgboost_pandas_breast_cancer_tune @ 2021-06-13 21:56:24.701279+00:00: manual__2021-06-13T21:56:24.701279+00:00, externally triggered: True>, 'ds': '2021-06-13', 'ds_nodash': '20210613', 'execution_date': DateTime(2021, 6, 13, 21, 56, 24, 701279, tzinfo=Timezone('+00:00')), 'inlets': [], 'macros': <module 'airflow.macros' from '/usr/local/lib/python3.7/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2021-06-13', 'next_ds_nodash': '20210613', 'next_execution_date': DateTime(2021, 6, 13, 21, 56, 24, 701279, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2021-06-13', 'prev_ds_nodash': '20210613', 'prev_execution_date': DateTime(2021, 6, 13, 21, 56, 24, 701279, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7f76e249e2d0 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f76e24a0f80>>, 'prev_start_date_success': <Proxy at 0x7f76e249e820 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f76e24b6f80>>, 'run_id': 'manual__2021-06-13T21:56:24.701279+00:00', 'task': <Task(_PythonDecoratedOperator): split_train_test>, 'task_instance': <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T21:56:24.701279+00:00 [failed]>, 'task_instance_key_str': 'xgboost_pandas_breast_cancer_tune__split_train_test__20210613', 'test_mode': False, 'ti': <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T21:56:24.701279+00:00 [failed]>, 'tomorrow_ds': '2021-06-14', 'tomorrow_ds_nodash': '20210614', 'ts': '2021-06-13T21:56:24.701279+00:00', 'ts_nodash': '20210613T215624', 'ts_nodash_with_tz': '20210613T215624.701279+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2021-06-12', 'yesterday_ds_nodash': '20210612', 'exception': '\x1b[36mray::_KvStoreActor.execute()\x1b[39m (pid=14747, ip=0.0.0.0)\n  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task\n  File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor\n  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor\n    return method(__ray_actor, *args, **kwargs)\n  File "/usr/local/lib/python3.7/site-packages/ray_provider/xcom/ray_backend.py", line 90, in execute\n  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper\n    return func(*args, **kwargs)\nray.exceptions.RayTaskError(TypeError): \x1b[36mray::status_function()\x1b[39m (pid=14750, ip=0.0.0.0)\n  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task\n  File "/Users/ividas/.pyenv/versions/3.7.10/envs/test_ray_1_4/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing\n    return function(*args, **kwargs)\n  File "/usr/local/lib/python3.7/site-packages/ray_provider/xcom/ray_backend.py", line 72, in status_function\n  File "/usr/local/airflow/dags/xgboost_pandas_breast_cancer_tune.py", line 90, in split_train_test\nTypeError: cannot unpack non-iterable NoneType object'}
[2021-06-13 21:56:40,417] {ray_backend.py:330} INFO - Checking for failed dependents
[2021-06-13 21:56:40,444] {ray_backend.py:198} ERROR - Time to clean up
[2021-06-13 21:56:40,445] {filelock.py:274} INFO - Lock 140149509146768 acquired on /tmp/ray_backend.lock
[2021-06-13 21:56:40,445] {filelock.py:274} INFO - Lock 140149508886416 acquired on /tmp/ray_kv.lock
[2021-06-13 21:56:40,458] {base.py:78} INFO - Using connection to: id: ray_cluster_connection. Host: 192.168.106.200, Port: 10001, Schema: , Login: , Password: None, extra: None
[2021-06-13 21:56:40,458] {ray_client.py:48} INFO - Connection base_url is 192.168.106.200:10001
[2021-06-13 21:56:42,354] {ray_client.py:55} INFO - New Ray Connection Established
[2021-06-13 21:56:42,360] {ray_backend.py:116} INFO - Failed to look up actor with name 'ray_kv_store'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.
[2021-06-13 21:56:42,360] {ray_backend.py:214} ERROR - Error getting store on cleanup Failed to look up actor with name 'ray_kv_store'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.
[2021-06-13 21:56:42,361] {ray_client.py:73} INFO - Cleaning connections
[2021-06-13 21:56:42,369] {base.py:78} INFO - Using connection to: id: ray_cluster_connection. Host: 192.168.106.200, Port: 10001, Schema: , Login: , Password: None, extra: None
[2021-06-13 21:56:43,494] {filelock.py:318} INFO - Lock 140149508886416 released on /tmp/ray_kv.lock
[2021-06-13 21:56:43,494] {filelock.py:318} INFO - Lock 140149509146768 released on /tmp/ray_backend.lock

For some reason it can't get the ray_kv_store actor that was created in the previous step.

Just for comparison, with Ray 1.3, the step runs successfully with the following log:

*** Reading local file: /usr/local/airflow/logs/xgboost_pandas_breast_cancer_tune/split_train_test/2021-06-13T15:41:27.795113+00:00/7.log
[2021-06-13 19:53:30,692] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T15:41:27.795113+00:00 [queued]>
[2021-06-13 19:53:30,729] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T15:41:27.795113+00:00 [queued]>
[2021-06-13 19:53:30,729] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-06-13 19:53:30,729] {taskinstance.py:1069} INFO - Starting attempt 7 of 7
[2021-06-13 19:53:30,729] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-06-13 19:53:30,746] {taskinstance.py:1089} INFO - Executing <Task(_PythonDecoratedOperator): split_train_test> on 2021-06-13T15:41:27.795113+00:00
[2021-06-13 19:53:30,752] {standard_task_runner.py:52} INFO - Started process 330 to run task
[2021-06-13 19:53:30,757] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xgboost_pandas_breast_cancer_tune', 'split_train_test', '2021-06-13T15:41:27.795113+00:00', '--job-id', '51', '--pool', 'ray_worker_pool', '--raw', '--subdir', 'DAGS_FOLDER/xgboost_pandas_breast_cancer_tune.py', '--cfg-path', '/tmp/tmpwi_f7c8v', '--error-file', '/tmp/tmplc4fypv2']
[2021-06-13 19:53:30,758] {standard_task_runner.py:77} INFO - Job 51: Subtask split_train_test
[2021-06-13 19:53:30,814] {logging_mixin.py:104} INFO - Running <TaskInstance: xgboost_pandas_breast_cancer_tune.split_train_test 2021-06-13T15:41:27.795113+00:00 [running]> on host 4f4f06a13af3
[2021-06-13 19:53:30,895] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xgboost_pandas_breast_cancer_tune
AIRFLOW_CTX_TASK_ID=split_train_test
AIRFLOW_CTX_EXECUTION_DATE=2021-06-13T15:41:27.795113+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-13T15:41:27.795113+00:00
[2021-06-13 19:53:30,895] {ray_decorators.py:15} INFO - [wrapper] Got executor.
[2021-06-13 19:53:30,908] {base.py:78} INFO - Using connection to: id: ray_cluster_connection. Host: 192.168.106.200, Port: 10001, Schema: , Login: , Password: None, extra: None
[2021-06-13 19:53:30,908] {ray_client.py:48} INFO - Connection base_url is 192.168.106.200:10001
[2021-06-13 19:53:31,150] {ray_client.py:55} INFO - New Ray Connection Established
[2021-06-13 19:53:31,153] {ray_decorators.py:19} INFO - [wrapper] Launching task (with ('ObjectRef(b3d7bf38f6985fccffffffffffffffffffffffff0100000002000000)',), {}.
[2021-06-13 19:53:31,153] {filelock.py:274} INFO - Lock 139948702813520 acquired on /tmp/ray_kv.lock
[2021-06-13 19:53:34,043] {filelock.py:318} INFO - Lock 139948702813520 released on /tmp/ray_kv.lock
[2021-06-13 19:53:34,044] {ray_decorators.py:21} INFO - [wrapper] Remote task finished
[2021-06-13 19:53:34,187] {taskinstance.py:1192} INFO - Marking task as SUCCESS. dag_id=xgboost_pandas_breast_cancer_tune, task_id=split_train_test, execution_date=20210613T154127, start_date=20210613T195330, end_date=20210613T195334
[2021-06-13 19:53:34,219] {taskinstance.py:1246} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2021-06-13 19:53:34,274] {local_task_job.py:146} INFO - Task exited with return code 0
[2021-06-13 19:53:34,300] {ray_backend.py:220} INFO - DAG marked success, cleaning up
@vvbandeira
Copy link

I am having a similar issue, can't pass data between functions. I also get:

Failed to look up actor with name 'ray_kv_store'.
WARNING - It looks like you're creating a detached actor in an anonymous namespace.

Not sure if is the same issue, since my setup is different. I am running inside a Kubernetes cluster managed by Google Composer (part of their cloud infrastructure). The python version is 3.8 (this is locked by the Composer) and I have Ray 1.6.0

@isaac-vidas did you ever find a solution or work around?

@isaac-vidas
Copy link
Author

@vvbandeira I didn't try this since but I noticed there was that related PR #30 was merged so I thought this was fixed already.

In general, I'm currently using a different path with Airflow. Our Airflow is running with the KubernetesExecutor and so we're running the driver (ray.init) from inside the pod to work with the remote Ray cluster. This also enable us to use different Python / Ray versions between Ray clusters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants