How to implement a FIFO queue that supports namesp

2019-04-14 00:05发布

I'm using the following approach to handle a FIFO queue based on Google App Engine db.Model (see this question).

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

This queue works as expected (very good).

Right now my code has a method that access this FIFO queue invoked by a deferred queue:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

I would like to enhance this method and the queue data structure adding a client_ID parameter representing a specific client that needs to access its own Queue. Something like:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

How could I code the Queue to be client_ID aware?

Constraints:
- The number of clients is dynamic and not predefined
- Taskqueue is not an option (1. ten max queues 2. I would like to have full control on my queue)

Do you know how could I add this behaviour using the new Namespaces api (Remember that I'm not calling the db.Model from a webapp.RequestHandler)?
Another option: I could add a client_ID db.StringProperty to the QueueItem using it has a filter on pull method:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

Any better idea?

2条回答
唯我独甜
2楼-- · 2019-04-14 00:12

As I said in response to your query on my original answer, you don't need to do anything to make this work with namespaces: the datastore, on which the queue is built, already supports namespaces. Just set the namespace as desired, as described in the docs.

查看更多
forever°为你锁心
3楼-- · 2019-04-14 00:22

Assuming your "client class" is really a request handler the client calls, you could do something like this:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

Alternatively, you could also set the namespace app-wide.

By setting the default namespace all calls to the datastore will be "within" that namespace, unless you explicitly specify otherwise. Just note, to fetch and run tasks you'll have to know the namespace. So you probably want to maintain a list of namespaces in the default namespace for cleanup purposes.

查看更多
登录 后发表回答