I have a CLI script which logs all its processes into a log file.
One of the functions of the CLI is to upload a large file by
splitting it up into pieces and uploading them in parallel.
In linux, the whole things works like a charm but in windows I
cant seem to stream the log entries of the child process (_upload_for_multipart
) using Streamhandler
from the logging
module. The logger.info
statements in the
_upload_for_multipart
are correctly logged into the logfile ( my_logfile.txt
)
but they are not streamed on the terminal when the verbose
argument of the
CLI is chosen. All other statements( in other functions) are streamed as well as logged.
Any help?
Below is complete working example of the problem I face . You will not need any extra libraries to run it.
import argparse, glob, logging, math, os
from timeit import default_timer as timer
from filechunkio.filechunkio import FileChunkIO
from multiprocessing import cpu_count, Pool, freeze_support, current_process
from sys import exit, exc_info, argv, stdout, version_info, stdout, platform
from time import mktime, strptime
logger = None
def _upload_for_multipart(keyname, offset, multipart, part_num,bytes, parts):
try:
with FileChunkIO(keyname, 'r', offset=offset, bytes=bytes) as fp:
try:
start = timer()
logger.info( 'Uploading part {0}/{1}'.format ( part_num, parts ) )
logger.info('Uploading im MP')
end = timer()
except Exception as e:
logger.error('Some error occured')
exit()
logger.info( 'UPLOADED part {0}/{1} time = {2:0.1f}s Size: {3}'.format (part_num, parts, (end - start), bytes ) )
except Exception as e:
logger.error( 'FAILED uploading {0}.{1}'.format(keyname), e )
exit(1)
def _upload_part(argFile, argBucket, **core_chunk):
file_path = argFile
bucket_name = argBucket
file_name = os.path.basename( file_path )
source_size = os.stat( file_path ).st_size
chunk_size = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
5242880)
chunk_amount = int(math.ceil(source_size / float(chunk_size)))
#mp = s3_bucket.initiate_multipart_upload( file_name )
mp = ''
logger.info('Initiate multipart upload')
logger.info( 'File size of {0} is {1}. Parallel uploads will be used to speed up the process'\
.format( file_name, source_size ) )
start_time = timer()
pool = Pool(processes=1, initializer = init_log, initargs = ( logFile, ) )
for i in range( chunk_amount ):
offset = i * chunk_size
remaining_bytes = source_size - offset
bytes = min( [chunk_size, remaining_bytes] )
part_num = i + 1
start = timer()
pool.apply_async( _upload_for_multipart, [file_name, offset, mp, part_num, bytes, chunk_amount] )
pool.close()
pool.join()
end = timer()
logger.info('Process complete')
def _get_logger( pdir, ldir, lname, level, fmt ):
try:
logs_dir = os.path.join( pdir, ldir )
if not os.path.exists( logs_dir ):
os.makedirs( logs_dir )
except Exception as e:
print ('{}'.format(e))
exit(1)
logging.basicConfig(
filename=os.path.join(logs_dir, lname),
level=level,
format=fmt
)
return logging.getLogger( lname )
def init_log(logFile):
global logger
exec_file = os.path.abspath( argv[0] )
exec_dir = os.path.dirname( exec_file )
default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
fmt='%(asctime)s %(levelname)s: %(message)s' )
log_filename = logFile
level = 'INFO'
format = '%(asctime)s %(levelname)s: %(message)s'
default_logger.update( fmt=format, level=level, lname = log_filename )
if os.path.isabs( log_filename ):
bdir, log_filename = os.path.split( log_filename )
default_logger.update(pdir='', ldir = bdir, lname = log_filename )
logger = _get_logger( **default_logger )
if __name__ == "__main__":
freeze_support()
parser = argparse.ArgumentParser( description="CLI." )
group = parser.add_mutually_exclusive_group()
group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
channel")
args = parser.parse_args()
logFile = 'mylogfile.txt'
init_log(logFile)
bucket_name = 'some-bucket'
if args.verbose:
try:
print_handler = logging.StreamHandler( stdout )
print_handler.setLevel( logging.DEBUG )
formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s' )
print_handler.setFormatter( formatter )
logger.addHandler( print_handler )
except (NoOptionError, NoSectionError) as e:
logger.exception( e )
logger.info('Establishing Connection')
_upload_part('large_testfile.log', bucket_name)
The
StreamHandler
isn't working in the children because you're only setting it up in the parent process. You need to do all your logging setup inside ofinit_log
for it to take effect in the children: