I have a multi processed web server with processes that never end, I would like to check my code coverage on the whole project in a live environment (not only from tests).
The problem is, that since the processes never end, I don't have a good place to set the cov.start() cov.stop() cov.save()
hooks.
Therefore, I thought about spawning a thread that in an infinite loop will save and combine the coverage data and then sleep some time, however this approach doesn't work, the coverage report seems to be empty, except from the sleep line.
I would be happy to receive any ideas about how to get the coverage of my code, or any advice about why my idea doesn't work. Here is a snippet of my code:
import coverage
cov = coverage.Coverage()
import time
import threading
import os
class CoverageThread(threading.Thread):
_kill_now = False
_sleep_time = 2
@classmethod
def exit_gracefully(cls):
cls._kill_now = True
def sleep_some_time(self):
time.sleep(CoverageThread._sleep_time)
def run(self):
while True:
cov.start()
self.sleep_some_time()
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
if self._kill_now:
break
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
cov.html_report(directory="coverage_report_data.html")
print "End of the program. I was killed gracefully :)"
Since you are willing to run your code differently for the test, why not add a way to end the process for the test? That seems like it will be simpler than trying to hack coverage.
You can use pyrasite directly, with the following two programs.
And this one
Another way to go would be to trace the program using lptrace even if it only prints calls it can be useful.
Apparently, it is not possible to control
coverage
very well with multipleThreads
. Once different thread are started, stopping theCoverage
object will stop all coverage andstart
will only restart it in the "starting" Thread. So your code basically stops the coverage after 2 seconds for allThread
other than theCoverageThread
.I played a bit with the API and it is possible to access the measurments without stopping the
Coverage
object. So you could launch a thread that save the coverage data periodically, using the API. A first implementation would be something like in thisA more stable version can be found in this GIST. This code basically grab the info collected by the collector without stopping it. The
get_data_dict
function take the dictionary in theCoverage.collector
and pop the available data. This should be safe enough so you don't lose any measurement.The report files get updated every
_delay
seconds.But if you have multiple process running, you need to add extra efforts to make sure all the process run the
CoverageLoggerThread
. This is thepatch_multiprocessing
function, monkey patched from thecoverage
monkey patch...The code is in the GIST. It basically replaces the original Process with a custom process, which start the
CoverageLoggerThread
just before running therun
method and join the thread at the end of the process. The scriptmain.py
permits to launch different tests with threads and processes.There is 2/3 drawbacks to this code that you need to be carefull of:
It is a bad idea to use the
combine
function concurrently as it performs comcurrent read/write/delete access to the.coverage.*
files. This means that the functionexport
is not super safe. It should be alright as the data is replicated multiple time but I would do some testing before using it in production.Once the data have been exported, it stays in memory. So if the code base is huge, it could eat some ressources. It is possible to dump all the data and reload it but I assumed that if you want to log every 2 seconds, you do not want to reload all the data every time. If you go with a delay in minutes, I would create a new
_data
every time, usingCoverageData.read_file
to reload previous state of the coverage for this process.The custom process will wait for
_delay
before finishing as we join theCoverageThreadLogger
at the end of the process so if you have a lot of quick processes, you want to increase the granularity of the sleep to be able to detect the end of the Process more quickly. It just need a custom sleep loop that break on_kill_now
.Let me know if this help you in some way or if it is possible to improve this gist.
EDIT: It seems you do not need to monkey patch the multiprocessing module to start automatically a logger. Using the
.pth
in your python install you can use a environment variable to start automatically your logger on new processes:You can then start your coverage logger with
COVERAGE_LOGGER_START=1 python main.y