4

We have a RESTful(-ish) twisted application that uses txpostgres to access a postgres db. Currently, we generate new txpostgres.Connection instances every time a client pings the server for a db call. This is inefficient and results in our db quickly getting overwhelmed. I've been trying to adapt this to use txpostgres.ConnectionPool instead, but am running into trouble. Right now I have something that looks like this:

class DBTester(object):
    def __init__(self):
        self.cfg = load_config('local')  # load the db settings from a JSON file
        self.pool = ConnectionPool(None, min=1, **self.cfg) # create the pool

    @defer.inlineCallbacks
    def get_pool(self):
        yield self.pool.start()
        defer.returnValue(self.pool)


class DBT(object):
    def __init__(self):
        self.db = DBTester()

    @defer.inlineCallbacks
    def t(self):
        conn = yield self.db.get_pool()
        res = yield conn.runQuery('select * from clients')
        println('DBT.t result: {}'.format(res))


if __name__ == "__main__":
    dbt = DBT()
    dbt.t()
    dbt.t()

    reactor.run()

The issue is the timing of the pool.start() call. If I put it in DBTester.__init__, I get psycopg2.OperationalError: asynchronous connection attempt underway. If I put it in DBTester.get_pool, one db.t() call works, and the other(s) fail with exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery'. I've been struggling with this basically all day, and haven't been able to crack it, nor have I been able to find much online.

I really just need a pointer to some minimal example of how ConnectionPool is used. Any suggestions?

1
  • Why do you have the pool in the DBTester class? Is there a reason for that? Commented Oct 3, 2014 at 13:42

2 Answers 2

6

It sounds like your problem is not with txpostgres but more with twisted and the async way of thinking.

exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery' means: You tried to throw SQL queries after the database before a connection was made. That is stupid! So now I think I will throw an exception so dear User knows about this madness.

So, this could happen if you had something like

pool = ConnectionPool(None, min=1)
d1 = pool.start()
d2 = pool.runQuery('select tablename from pg_tables')

This code creates two deferreds and thorws em in the reactor. Only the scheduling algorithm knows which one of the two that is executed first, and if it is d2, then the error happens.

txpostgres.txpostgres.AlreadyConnected means: Pretty self explaining, it makes no sense to start a pool that is already started.

psycopg2.OperationalError: asynchronous connection attempt underway means:
I was in the middle of setting up a nice async database connection when you started executing SQL statements. The database connection was not ready yet and thus the sql queries were not executed That makes me sad. I think I will throw an operational error so dear User knows that the statement failed.

Okay, so we need a way make sure that the connection is established before we throw sql queries after the database. Below is a code example that uses callbacks to achieve this.

from txpostgres.txpostgres import ConnectionPool
from twisted.internet import reactor, defer
from twisted.python import log, util


class SomeClass(object):

    pool = ConnectionPool(
        None,
        min=1,
        user="user",
        password="pass",
        host='host.com')

    @defer.inlineCallbacks
    def fetch_tables(self):
        res = yield self.pool.runQuery('select tablename from pg_tables')
        defer.returnValue(res)


if __name__ == "__main__":

    def querydb(n=10):
        dl = []
        for i in range(n):
            d = s.fetch_tables()
            d.addCallback(lambda tables: util.println(len(tables)))
            dl.append(d)
        return defer.DeferredList(dl)

    s = SomeClass()
    d_startpool = s.pool.start()
    d_startpool.addCallback(lambda _: querydb())
    d_startpool.addCallback(lambda _: s.pool.close())
    d_startpool.addErrback(log.err)
    d_startpool.addBoth(lambda _: reactor.stop())

    reactor.run()

Hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

0

I don't know if this counts as best practices, but here's what we're going with:

## dbutil.py
class DBConnection(object):
    def __init__(self, cfg_name):
        self.cfg_name = cfg_name
        self.cfg = self.load_config(self.cfg_name)
        self.pool = txpostgres.ConnectionPool(None, min=5, **self.cfg)

    @staticmethod
    def load_config(name):
        with open('config.json') as json_file:
            cfg = json.load(json_file)
        return cfg.get(name)

    @defer.inlineCallbacks
    def get_pool(self):
        try:
            yield self.pool.start()
        except txpostgres.AlreadyConnected:
            pass
        defer.returnValue(self.pool)


@defer.inlineCallbacks
def db_factory(cfg_name):
    db = DBConnection(cfg_name)
    db.pool = yield db.get_pool()
    defer.returnValue(db)


## basehandler.py
def __init__(self, name=None, db=None):
    resource.Resource.__init__(self)
    self.name = name
    self.db = db
    self.pool = self.db.pool

@defer.inlineCallbacks
def runQuery(self, *args, **kwargs):
    res = yield self.pool.runQuery(*args, **kwargs)
    defer.returnValue(res)


## server.py
@defer.inlineCallbacks
def init_site(db):
    db = yield db_factory(db)
    root = RootURLHandler(db)
    reactor.listenTCP(SERVER_PORT, site)

def main(db):
    log.startLogging(LogFile('server.log', '.', maxRotatedFiles=5))
    init_site(db)
    reactor.run()

The key, perhaps unsurprisingly, was making site initialization a deferred contingent upon the db stuff going through.

2 Comments

I really think you should reconsider. parts of this code is horrible, sorry to say, especially the except txpostgres.AlreadyConnected part. but yeah it works. But it would never go through QA where i work..
This was pre-QA code--nobody had answered my question and I had gotten an MVP, so I thought I'd do the right thing and share what I'd learned on my own.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.