Skip to content Skip to sidebar Skip to footer

Celery Task Custom Tracking Method

My main problem relies on the fact that i need to know if a task is still queued, started or revoked. I cant do this with celery and redis because 24hs after the results are in re

Solution 1:

You probably have to combine multiple approaches. If your results expire in the backend (which is reasonable), you have to employ different storage like a database for long-term archiving of tasks' states. For a start, you can enable task_track_started so that tasks report STARTED status when worker starts the execution). Then periodically check the results backend for status updates of tasks that are not in ready states (SUCCESS, FAILURE and REVOKED). If they are in the final state, remove the result from the backend using forget() method.

The only problem is with revoked tasks. If there are no workers available, revoking the task has no effect (that's why you should always wait for reply when calling a revoke). If the workers are busy and thus the task remains in the message queue, workers just note that such task should be discarded when they pick it up from the queue, but it's stored only in the worker's state. Once they take it, they drop the task and result contains REVOKED status eventually. The key is to note that revoked tasks are only maintained in the workers' state and so you should use --statedb parameter to persist the state in case the worker crashes. Otherwise, already revoked tasks will happily get processed by the same or another worker.

Your best option I guess is to call the revoke command and if you get a reply from workers, set internal status of the task in your database to something like FLAGGED_REVOKED. In the status update loop, update the revoked task's status only if it's not PENDING.

I have a simple job scheduling app that employs APScheduler as a scheduler and Celery as a execution layer. Information about the jobs, job runs and the schedule are kept in MongoDB. Here is a the code I use to cancel the job:

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
    reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
    if reply:
        collection.update_one({'_id': run['_id']},
                              {'$set': {'task_state': 'FLAGGED_REVOKED'}})
    else:
        raise Exception('Failed to revoke the task (no reply received)')
else:
    raise Exception('Job execution cannot be canceled')

This is my status update code (that's kept as an internal APScheduler job to run every few seconds):

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

runs = collection.find({
    'task_id': {'$exists': True},
    'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
    result = AsyncResult(run['task_id'],
                         backend=celery.backend, app=celery)
    if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
        update = {'task_state': 'FLAGGED_REVOKED'}
    else:
        update = {'task_state': result.state}
    if result.state == 'FAILURE':
        update['exception'] = str(result.result)
        update['traceback'] = result.traceback
    elif result.state == 'SUCCESS':
        update['result'] = result.result
    if result.date_done:
        date_done = dateparser.parse(result.date_done) \
            if isinstance(result.date_done, str) else result.date_done
        update['finish_time'] = date_done
    try:
        collection.update_one({'_id': run['_id']}, {'$set': update})
    except Exception as e:
        print('Failed to update task status: %s', str(e))
    else:
        if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
            result.forget()

Post a Comment for "Celery Task Custom Tracking Method"