so I have a twisted server I built, and I was wondering what is the best way to limit the number of simultaneous connections?
Is having my Factory return None the best way? When I do this, I throw a lot of exceptions like:
exceptions.AttributeError: 'NoneType' object has no attribute 'makeConnection'
I would like someway to have the clients just sit in queue until the current connection number goes back down, but I don't know how to do that asynchronously.
Currently I am using my factory do like this:
class HandleClientFactory(Factory):
def __init__(self):
self.numConnections = 0
def buildProtocol(self, addr):
#limit connection number here
if self.numConnections >= Max_Clients:
logging.warning("Reached maximum Client connections")
return None
return HandleClient(self)
which works, but disconnects rather than waits, and also throws a lot of unhandled errors.
You have to build this yourself. Fortunately, the pieces are mostly in place to do so (you could probably ask for slightly more suitable pieces but ...)
First, to avoid the AttributeError
(which indeed causes the connection to be closed), be sure to return an IProtocol
provider from your buildProtocol
method.
class DoesNothing(Protocol):
pass
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = DoesNothing()
protocol.factory = self
return protocol
If you use this factory (filling in the missing pieces - eg, initializing maxConnections
and so tracking currentConnections
correctly) then you'll find that clients which connect once the limit has been reached are given the DoesNothing
protocol. They can send as much data as they like to this protocol. It will discard it all. It will never send them any data. It will leave the connection open until they close it. In short, it does nothing.
However, you also wanted clients to actually receive service once connection count fell below the limit.
To do this, you need a few more pieces:
- You have to keep any data they might send buffered so it is available to be read when you're ready to read it.
- You have to keep track of the connections so you can begin to service them when the time is ripe.
- You have to begin to service them at said time.
For the first of these, you can use the feature of most transports to "pause":
class PauseTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = PauseTransport()
protocol.factory = self
return protocol
PauseTransport
is similar to DoesNothing
but with the minor (and useful) difference that as soon as it is connected to a transport it tells the transport to pause. Thus, no data will ever be read from the connection and it will all remain buffered for whenever you're ready to deal with it.
For the next requirement, many possible solutions exist. One of the simplest is to use the factory as storage:
class PauseAndStoreTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
self.factory.addPausedTransport(transport)
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
self.transports.append(transport)
Again, with the proper setup (eg, initialize the transports
attribute), you now have a list of all of the transports which correspond to connections you've accepted above the concurrency limit which are waiting for service.
For the last requirement, all that is necessary is to instantiate and initialize the protocol that's actually capable of serving your clients. Instantiation is easy (it's your protocol, you probably know how it works). Initialization is largely a matter of calling the makeConnection
method:
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
# As above
...
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
transport = self.transports.pop(0)
protocol = self.buildProtocol(address)
protocol.makeConnection(transport)
transport.resumeProducing()
I've omitted the details of keeping track of the address
argument required by buildProtocol
(with the transport
carried from its point of origin to this part of the program, it should be clear how to do something similar for the original address value if your program actually wants it).
Apart from that, all that happens here is you take the next queued transport (you could use a different scheduling algorithm if you want, eg LIFO) and hook it up to a protocol of your choosing just as Twisted would do. Finally, you undo the earlier pause operation so data will begin to flow.
Or... almost. This would be pretty slick except Twisted transports don't actually expose any way to change which protocol they deliver data to. Thus, as written, data from clients will actually be delivered to the original PauseAndStoreTransport
protocol instance. You can hack around this (and "hack" is clearly the right word). Store both the transport and PauseAndStoreTransport
instance in the list on the factory and then:
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
originalProtocol, transport = self.transports.pop(0)
newProtocol = self.buildProtocol(address)
originalProtocol.dataReceived = newProtocol.dataReceived
originalProtocol.connectionLost = newProtocol.connectionLost
newProtocol.makeConnection(transport)
transport.resumeProducing()
Now the object that the transport wants to call methods on has had its methods replaced by those from the object that you want the methods called on. Again, this is clearly a hack. You can probably put together something less hackish if you want (eg, a third protocol class that explicitly supports delegating to another protocol). The idea will be the same - it'll just be more wear on your keyboard. For what it's worth, I suspect that it may be both easier and less typing to do something similar using Tubes but I'll leave an attempt at a solution based on that library to someone else for now.
I've avoided addressing the problem of keeping currentConnections
properly up to date. Since you already had numConnections
in your question I'm assuming you know how to manage that part. All I've done in the last step here is suppose that the way you do the decrement step is by calling oneConnectionDisconnected
on the factory.
I've also avoided addressing the event that a queued connection gets bored and goes away. This will mostly work as written - Twisted won't notice the connection was closed until you call resumeProducing
and then connectionLost
will be called on your application protocol. This should be fine since your protocol needs to handle lost connections anyway.