How can I provide shared state to my Flask app wit

2020-07-27 03:11发布

I want to provide shared state for a Flask app which runs with multiple workers, i. e. multiple processes.

To quote this answer from a similar question on this topic:

You can't use global variables to hold this sort of data. [...] Use a data source outside of Flask to hold global data. A database, memcached, or redis are all appropriate separate storage areas, depending on your needs.

(Source: Are global variables thread safe in flask? How do I share data between requests?)

My question is on that last part regarding suggestions on how to provide the data "outside" of Flask. Currently, my web app is really small and I'd like to avoid requirements or dependencies on other programs. What options do I have if I don't want to run Redis or anything else in the background but provide everything with the Python code of the web app?

2条回答
Juvenile、少年°
2楼-- · 2020-07-27 03:44

If your webserver's worker type is compatible with the multiprocessing module, you can use multiprocessing.managers.BaseManager to provide a shared state for Python objects. A simple wrapper could look like this:

from multiprocessing import Lock
from multiprocessing.managers import BaseManager

class SharedState:
    def __init__(self, address, authkey):
        self._data = {}
        self._lock = Lock()
        self._manager = BaseManager(address, authkey)
        self._manager.register('get', self._get)
        self._manager.register('set', self._set)
        try:
            self._manager.get_server()
            self._manager.start()
        except OSError: # Address already in use
            self._manager.connect()
    def __getattr__(self, name):
        if name.startswith('_'):
            return object.__getattr__(self, name)
        return self._manager.get(name)._getvalue()
    def __setattr__(self, name, value):
        if name.startswith('_'):
            return object.__setattr__(self, name, value)
        return self._manager.set(name, value)
    def _get(self, name):
        return self._data[name]
    def _set(self, name, value):
        with self._lock:
            self._data[name] = value

You can assign your data to attributes of an instance of SharedState to make it accessible across processes:

ADDRESS = '127.0.0.1', 35791
AUTHKEY = b'secret'
ss = SharedState(ADDRESS, AUTHKEY)

ss.number = 0
ss.text = 'Hello World'
ss.array = numpy.array([1, 2, 3])

I have tested this with both fork and spawn start method, it works on both Linux and Windows.

Flask example:

The following Flask app uses a global variable to store a counter number:

from flask import Flask
app = Flask(__name__)

number = 0

@app.route('/')
def counter():
    global number
    number += 1
    return str(number)

This works when using only 1 worker gunicorn -w 1 server:app. When using multiple workers gunicorn -w 4 server:app it becomes apparent that number is not a shared state but individual for each worker process.

Instead, with SharedState, the app looks like this:

from flask import Flask
app = Flask(__name__)

ADDRESS = '127.0.0.1', 35791
AUTHKEY = b'secret'
ss = SharedState(ADDRESS, AUTHKEY)
ss.number = 0

@app.route('/')
def counter():
    ss.number += 1
    return str(ss.number)

This works with any number of workers, like gunicorn -w 4 server:app.

查看更多
唯我独甜
3楼-- · 2020-07-27 04:04

your example is a bit magic for me! I'd suggest reusing the magic already in the multiprocessing codebase in the form of a Namespace. I've attempted to make the following code compatible with spawn servers (i.e. MS Windows) but I only have access to Linux machines, so can't test there

start by pulling in dependencies and defining our custom Manager and registering a method to get out a Namespace singleton:

from multiprocessing.managers import BaseManager, Namespace, NamespaceProxy

class SharedState(BaseManager):
    _shared_state = Namespace(number=0)

    @classmethod
    def _get_shared_state(cls):
        return cls._shared_state

SharedState.register('state', SharedState._get_shared_state, NamespaceProxy)

this might need to be more complicated if creating the initial state is expensive and hence should only be done when it's needed. note that the OPs version of initialising state during process startup will cause everything to reset if gunicorn starts a new worker process later, e.g. after killing one due to a timeout

next I define a function to get access to this shared state, similar to how the OP does it:

def shared_state(address, authkey):
    manager = SharedState(address, authkey)
    try:
        manager.get_server()  # raises if another server started
        manager.start()
    except OSError:
        manager.connect()
    return manager.state()

though I'm not sure if I'd recommend doing things like this. when gunicorn starts it spawns lots of processes that all race to run this code and it wouldn't surprise me if this could go wrong sometimes. also if it happens to kill off the server process (because of e.g. a timeout) every other process will start to fail

that said, if we wanted to use this we would do something like:

ss = shared_state('server.sock', b'noauth')

ss.number += 1

this uses Unix domain sockets (passing a string rather than a tuple as an address) to lock this down a bit more.

also note this has the same race conditions as the OP's code: incrementing a number will cause the value to be transferred to the worker's process, which is then incremented, and sent back to the server. I'm not sure what the _lock is supposed to be protecting, but I don't think it'll do much

查看更多
登录 后发表回答