我似乎无法在此环绕的问题我的头。 我正在关注的编写测试我的渠道,消费者在文档描述 。 我通常会使用Django的默认单元测试,但由于信道需要使用pytest,我使用它,但还是会喜欢保持编写测试的类结构。
我的问题与写作类似单元测试,我会用初始化测试属性设置方法。 我试过的方法灯具 ,灯具类使用autouse =真 ,但他们都不似乎工作,因为我无法访问的测试方法的实例属性。 最后,我决定写set_up
实例方法和手动调用它为每个类。 这是我到目前为止有:
@pytest.mark.django_db
@pytest.mark.asyncio
class TestChatConsumer(object):
@database_sync_to_async
def set_up(self):
self.user = UserFactory()
self.api_client = APIClientFactory()
self.token = TokenFactory(user=self.user)
self.credentials = {
'AUTHORIZATION': self.token.key,
...
}
self.credentials_params = '&'.join(['{}={}'.format(k, v) for k, v in self.credentials.items()])
self.authless_endpoint = '/api/v1/ws/'
self.endpoint = self.authless_endpoint + '?' + self.credentials_params
async def test_connect_without_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.authless_endpoint)
connected, subprotocol = await communicator.connect()
assert not connected
async def test_connect_with_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.endpoint)
connected, subprotocol = await communicator.connect()
assert connected
问题是,我不断收到psycopg2.InterfaceError: connection already closed
,当我试图消费者访问数据库,在它的内部代码connect
方法。 它采用database_sync_to_async
和工作得很好,当我手动测试它。
具体而言,在该线路connect
是引发错误的方法是这样的:
await database_sync_to_async(user.inbox.increment_connections)().
不知道到底是什么问题作为database_sync_to_async
应妥善清理旧的连接,使新的可适当创建。
任何帮助将是非常赞赏。