-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task_id must not be empty. Got None instead #117
Comments
Can you provide sample code for reproducing this? |
Sure. So I have my main function which is:
|
that futures append line is actually inside of with , somehow when I paste it's not indenting properly. |
Let me know if you need any more info.. It seems to me somehow I need to pass the task_id inside of the threaded function, but not sure how. |
Also is there any significance of the delay in this line: result = my_task.delay(10) Sometimes I see my task won't start and give an error on the delay line:
It works 50% of the time. |
I was able to reproduce this with the following code:
Here's what ChatGPT has to say about it: You're trying to use your Another point is that Celery is not built to work with threads inside a task. The usual practice is to divide complex jobs into small tasks and let Celery handle their execution. However, If you want to continue using threading, a possible solution could be to share the progress of the task with all the threads by placing it somewhere they could all access, for example through a shared variable, and update this from the main task. Here's a simplified example: from concurrent.futures import ThreadPoolExecutor
from celery import shared_task
# shared variable
progress = {
'current': 0,
'total': 10,
}
@shared_task(bind=True)
def main_task(self, seconds):
global progress # access to shared variable
print(self.request.id)
with ThreadPoolExecutor() as executor:
futures = []
# setting some progress
progress.update({'current': 6, 'description': 'Main task'})
self.update_state(state='PROGRESS', meta=progress)
futures.append(executor.submit(sub_task, seconds))
def sub_task(seconds):
global progress # access to shared variable
print(progress['description'])
# updating progress
progress.update({'current': 8, 'description': 'Sub task work'})
# Note: we don't update Celery task state here, because we are outside of the task In the above code, progress will be shared between the I'm inclined to agree that you should be using celery primitives like subtasks or groups to achieve this and not the threading library. I think figuring out how to support this is beyond the scope of this project. |
So the email I got had something else compared to what I see now. SO I was trying that first:
but then the code fails on the sub_task where I have : saying self is not defined. I tried passing in self,task_id,seconds, and now it fails on (inside the sub_task):
task_id must not be empty. Got None instead.. |
My celery bars work fine normally, but when I try use it inside of a threadpool executor function, it fails. I pass the self keyword. Inside the threaded function, whenever I set the progress bar as:
progress_recorder.set_progress(8, seconds)
I get an error message:
Task_id must not be empty. Got None instead
on that line.
Any help would be appreciated!
The text was updated successfully, but these errors were encountered: