-->

What's a good approach to managing the db conn

2019-01-18 12:38发布

问题:

I'm just learning Google App Engine and am trying to figure out a good approach to managing my database connection to a Google Cloud SQL instance (if you haven't used GC-SQL, basically, it's MySQL in the cloud, with a few limitations).

I'm using the python (2.7) GAE environment with the webapp2 framework for handling requests. I know the FAQ says that it's recommended that a new connection to the DB be made with each request, but I don't know what the recommended way of closing the connection is. Each time I try to drop tables during development, the GC-SQL hangs and "show processlist" shows that there are a bunch of processes (likely because I'm not closing the DB) and that one of them is waiting for a lock (likely the process trying to drop the tables). This is annoying and forces me to restart the GC-SQL instance (like restarting mysql-server service, I imagine). There are also occasional DB hiccups that I believe are related to the fact that I'm not really closing my DB connection.

So, for example, should I have a destructor on my webapp2.Requesthandler subclass instance to disconnect from the DB? GAE objects seem to be cached sometimes, so that's also something to consider. I suppose I could just connect/query/disconnect for each query, but this seems suboptimal.

I know this is a vague question, but I'm hoping someone who's played in this area can thow some tips my way.

Thanks in advance!

Update: I tried implementing a wrapper around methods that need a cursot, using Shay's answer as a starting point. I'm getting GAE errors. Here's a new question specific to that: What are the connection limits for Google Cloud SQL from App Engine, and how to best reuse DB connections?

回答1:

I'm not familiar with Google Cloud SQL, but couldn't you use a WSGI middleware to open and close the connection?



回答2:

Here is a complete example of the helloworld example app from the Getting Started Guide. It is based on snippets from Shay Erlichmen and JJC, but this version is threadsafe.

You can use it like this:

  @with_db_cursor(do_commit = True)
  def get(self, cursor):
        cursor.execute('SELECT guestName, content, entryID FROM entries')

app.yaml

application: helloworld
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: /.*
  script: helloworld.app

helloworld.py

import cgi
import logging
import os
import threading
import webapp2

from google.appengine.api import rdbms

_INSTANCE_NAME = <name goes here>

def _db_connect():
  return rdbms.connect(instance=_INSTANCE_NAME, database='guestbook')

_mydata = threading.local()

def with_db_cursor(do_commit = False):
  """ Decorator for managing DB connection by wrapping around web calls.

  Stores connections and open cursor count in a threadlocal
  between calls.  Sets a cursor variable in the wrapped function. Optionally
  does a commit.  Closes the cursor when wrapped method returns, and closes
  the DB connection if there are no outstanding cursors.

  If the wrapped method has a keyword argument 'existing_cursor', whose value
  is non-False, this wrapper is bypassed, as it is assumed another cursor is
  already in force because of an alternate call stack.
  """
  def method_wrap(method):
    def wrap(self, *args, **kwargs):
      if kwargs.get('existing_cursor', False):
        # Bypass everything if method called with existing open cursor.
        return method(self, None, *args, **kwargs)

      if not hasattr(_mydata, 'conn') or not _mydata.conn:
        _mydata.conn = _db_connect()
        _mydata.ref = 0
        _mydata.commit = False

      conn = _mydata.conn
      _mydata.ref = _mydata.ref + 1

      try:
        cursor = conn.cursor()
        try:
          result = method(self, cursor, *args, **kwargs)
          if do_commit or _mydata.commit:
            _mydata.commit = False
            conn.commit()
          return result
        finally:
          cursor.close()
      finally:
        _mydata.ref = _mydata.ref - 1
        if _mydata.ref == 0:
          _mydata.conn = None
          logging.info('Closing conn')
          conn.close()
    return wrap
  return method_wrap


class MainPage(webapp2.RequestHandler):
  @with_db_cursor(do_commit = True)
  def get(self, cursor):
        cursor.execute('SELECT guestName, content, entryID FROM entries')
        self.response.out.write("""
          <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
          <html xmlns="http://www.w3.org/1999/xhtml" lang="en" xml:lang="en">
            <head>
               <title>My Guestbook!</title>
            </head>
            <body>""")
        self.response.out.write("""
              <table style="border: 1px solid black">
                <tbody>
                  <tr>
                    <th width="35%" style="background-color: #CCFFCC; margin: 5px">Name</th>
                    <th style="background-color: #CCFFCC; margin: 5px">Message</th>
                    <th style="background-color: #CCFFCC; margin: 5px">ID</th>
                  </tr>""")
        for row in cursor.fetchall():
          self.response.out.write('<tr><td>')
          self.response.out.write(cgi.escape(row[0]))
          self.response.out.write('</td><td>')
          self.response.out.write(cgi.escape(row[1]))
          self.response.out.write('</td><td>')
          self.response.out.write(row[2])
          self.response.out.write('</td></tr>')

        self.response.out.write("""
          </tbody>
            </table>
              <br /> No more messages!
              <br /><strong>Sign the guestbook!</strong>
              <form action="/sign" method="post">
              <div>First Name: <input type="text" name="fname" style="border: 1px solid black"></div>
              <div>Message: <br /><textarea name="content" rows="3" cols="60"></textarea></div>
              <div><input type="submit" value="Sign Guestbook"></div>
            </form>
          </body>
        </html>""")

class Guestbook(webapp2.RequestHandler):
  @with_db_cursor(do_commit = True)
  def post(self, cursor):
    fname = self.request.get('fname')
    content = self.request.get('content')
    # Note that the only format string supported is %s
    cursor.execute('INSERT INTO entries (guestName, content) VALUES (%s, %s)', (fname, content))

    self.redirect("/")

app = webapp2.WSGIApplication(
    [('/', MainPage),
     ('/sign', Guestbook)],
    debug=True)


回答3:

I wrote a decorator to handle SQL connection, feel free to flame :)

# Here is how you use the decorator from below
# the open, commit, and close is done by the decorator 
@need_cursor(do_commit = True)
def get(self, cursor, request): # cursor param is added by the decorator
    execute_sql(cursor, sql)

def need_cursor(do_commit = False):
    def method_wrap(method):
        def wrap(*args, **kwargs):
            conn = os.environ.get("__data_conn")

            # Recycling connection for the current request
            # For some reason threading.local() didn't worked
            # and yes os.environ suppose to be thread safe 
            if not conn:                
                conn = create_connection() # You need to implement this
                os.environ["__data_conn"] = conn
                os.environ["__data_conn_ref"] = 1
            else:
                os.environ["__data_conn_ref"] = 
                    os.environ["__data_conn_ref"] + 1

            try:
                cursor = conn.cursor()
                try:
                    result = method(cursor, *args, **kwargs)

                    if do_commit or os.environ.get("__data_conn_commit"):
                        os.environ["__data_conn_commit"] = False
                        conn.commit()

                    return result                    
                finally:
                    cursor.close()                
            finally:
                os.environ["__data_conn_ref"] = 
                    os.environ["__data_conn_ref"] - 1
                if os.environ["__data_conn_ref"] == 0:
                    os.environ["__data_conn"] = None
                    conn.close()        

        return wrap

    return method_wrap 


回答4:

This is my approach, that considers possible exceptions. I use this approach on a production environment and works well:


def _create_connection(schema):

    if (os.getenv('SERVER_SOFTWARE') and
        os.getenv('SERVER_SOFTWARE').startswith('Google App Engine/')):
        socket = '/cloudsql/%s' % env.DB_INSTANCE_NAME
        return MySQLdb.connect(unix_socket=socket, user=env.DB_APP_USER,
                               passwd=env.DB_APP_PASS, db=schema)
    else:
        return MySQLdb.connect(host='127.0.0.1', port=3306,
                               user=env.DB_APP_USER, passwd=env.DB_APP_PASS,
                               db=schema)


def with_db(commit=False, schema=env.DB_SCHEMA_NAME):

    def method_wrap(method):
        @functools.wraps(method)
        def wrap(self, *args, **kwds):
            # If needed,a connection pool can be added here.
            connection = _create_connection(schema)

            try:
                cur = connection.cursor()
                self.cur = cur
                self.conn = connection

                result = method(self, *args, **kwds)

                if commit:
                    connection.commit()

            except OperationalError as e:

                logging.error('Operational error.\r\nSQL exception: {},\r\n'
                              'Last Query: {}'.format(e, cur._last_executed))

                if commit and connection.open:
                    connection.rollback()
                raise

            except MySQLError as e:

                try:
                    warns = self.conn.show_warnings()
                    error = self.conn.error()
                except:
                    warns = ""
                    error = ""

                logging.error('Try to rolling back transaction.\r\nSQL exception: {},\r\n'
                              'Last Query: {},\r\nConn warn: {},\r\nError: {}'
                              .format(e, cur._last_executed, warns, error))


                if commit and connection.open:
                    connection.rollback()
                raise

            except Exception as e:
                logging.error('Try to rolling back transaction. Non SQL exception: {0}'.format(e))

                if commit and connection.open:
                    connection.rollback()
                raise

            finally:
                connection.close()

            return result
        return wrap
    return method_wrap

You can use it like this:


@with_db(commit=True)
def update_user_phone(self, user, phone):
    self.cur.execute(_SQL_UPDATE_USER_PHONE, (phone, user.id))

    # add or replace existing user to cache
    user.phone = phone
    self._update_user_cache(user)