I would like to implement the following use case with the ViewFlow library:
Problem
Processes of a particular Flow, started by a user, must wait in a queue before executing a celery job. Each user has a queue of these processes. Based on a schedule, or triggered manually, the next process in the queue is allowed to proceed.
Example
A node within my flow enters a named queue. Other logic within the application determines, for each queue, when to allow the next task to proceed. The next task in the queue is selected and its activation's done() method called.
An example flow might look like this:
class MyFlow(Flow):
start = flow.Start(...).Next(queue_wait)
queue_wait = QueueWait("myQueue").Next(job)
job = celery.Job(...).Next(end)
end = flow.End()
Question
What would be the best approach to implement queueing? In the above example, I don't know what "QueueWait" should be.
I've read through the docs and viewflow code, but it's not yet clear to me if this can be done using built-in Node and Activation classes, such as func.Function, or if I need to extend with custom classes.
After much experimentation, I arrived at a workable and simple solution:
from viewflow.flow import base
from viewflow.flow.func import FuncActivation
from viewflow.activation import STATUS
class Queue(base.NextNodeMixin,
base.UndoViewMixin,
base.CancelViewMixin,
base.DetailsViewMixin,
base.Event):
"""
Node that halts the flow and waits in a queue. To process the next waiting task
call the dequeue method, optionally specifying the task owner.
Example placing a job in a queue::
class MyFlow(Flow):
wait = Queue().Next(this.job)
job = celery.Job(send_stuff).Next(this.end)
end = flow.End()
somewhere in the application code:
MyFlow.wait.dequeue()
or:
MyFlow.wait.dequeue(process__myprocess__owner=user)
Queues are logically separated by the task_type, so new queues defined in a
subclass by overriding task_type attribute.
"""
task_type = 'QUEUE'
activation_cls = FuncActivation
def __init__(self, **kwargs):
super(Queue, self).__init__(**kwargs)
def dequeue(self, **kwargs):
"""
Process the next task in the queue by created date/time. kwargs is
used to add task filter arguments, thereby effectively splitting the queue
into subqueues. This could be used to implement per-user queues.
Returns True if task was found and dequeued, False otherwise
"""
filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW}
if kwargs is not None:
filter_kwargs.update(kwargs)
task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first()
if task is not None:
lock = self.flow_cls.lock_impl(self.flow_cls.instance)
with lock(self.flow_cls, task.process_id):
task = self.flow_cls.task_cls._default_manager.get(pk=task.pk)
activation = self.activation_cls()
activation.initialize(self, task)
activation.prepare()
activation.done()
return True
return False
I tried to make it as generic as possible and support the definition of multiple named queues as well as sub-queues, such as per-user queues.