Python - Using Streamhandler in multiprocessing en

2019-06-08 09:59发布

I have a CLI script which logs all its processes into a log file. One of the functions of the CLI is to upload a large file by splitting it up into pieces and uploading them in parallel. In linux, the whole things works like a charm but in windows I cant seem to stream the log entries of the child process (_upload_for_multipart) using Streamhandler from the logging module. The logger.info statements in the _upload_for_multipart are correctly logged into the logfile ( my_logfile.txt) but they are not streamed on the terminal when the verbose argument of the CLI is chosen. All other statements( in other functions) are streamed as well as logged. Any help? Below is complete working example of the problem I face . You will not need any extra libraries to run it.

import argparse, glob, logging, math, os
from timeit import default_timer as timer
from filechunkio.filechunkio import FileChunkIO
from multiprocessing import cpu_count, Pool, freeze_support, current_process
from sys import exit, exc_info, argv, stdout, version_info, stdout, platform
from time import mktime, strptime

logger = None

def _upload_for_multipart(keyname, offset, multipart, part_num,bytes, parts):
    try:
        with FileChunkIO(keyname, 'r', offset=offset, bytes=bytes) as fp:
                try:
                    start = timer()
                    logger.info( 'Uploading part {0}/{1}'.format ( part_num, parts ) )
                    logger.info('Uploading im MP')
                    end = timer()
                except Exception as e:
                    logger.error('Some error occured')
                    exit()
        logger.info( 'UPLOADED part {0}/{1} time = {2:0.1f}s Size: {3}'.format (part_num, parts, (end - start), bytes ) )
    except Exception as e:
        logger.error( 'FAILED uploading {0}.{1}'.format(keyname), e )
        exit(1)

def _upload_part(argFile, argBucket, **core_chunk):
    file_path = argFile
    bucket_name = argBucket
    file_name = os.path.basename( file_path )
    source_size = os.stat( file_path ).st_size

    chunk_size = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
                         5242880)

    chunk_amount = int(math.ceil(source_size / float(chunk_size)))
    #mp = s3_bucket.initiate_multipart_upload( file_name )
    mp = ''
    logger.info('Initiate multipart upload')
    logger.info( 'File size of {0} is {1}. Parallel uploads will be used to speed up the process'\
            .format( file_name, source_size  ) )

    start_time = timer()
    pool = Pool(processes=1, initializer = init_log, initargs = ( logFile, ) )
    for i in range( chunk_amount ):
        offset = i * chunk_size
        remaining_bytes = source_size - offset
        bytes = min( [chunk_size, remaining_bytes] )
        part_num = i + 1
        start = timer()
        pool.apply_async( _upload_for_multipart, [file_name, offset, mp, part_num, bytes, chunk_amount] )
    pool.close()
    pool.join()
    end = timer()
    logger.info('Process complete')

def _get_logger( pdir, ldir, lname, level, fmt ):
    try:
        logs_dir = os.path.join( pdir, ldir )
        if not os.path.exists( logs_dir ):
            os.makedirs( logs_dir )
    except Exception as e:
        print ('{}'.format(e))
        exit(1)

    logging.basicConfig(
            filename=os.path.join(logs_dir, lname),
            level=level,
            format=fmt
    )
    return logging.getLogger( lname )

def init_log(logFile):
    global logger
    exec_file = os.path.abspath( argv[0] )
    exec_dir = os.path.dirname( exec_file )
    default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                           fmt='%(asctime)s %(levelname)s: %(message)s' )

    log_filename = logFile
    level = 'INFO'
    format = '%(asctime)s %(levelname)s: %(message)s'

    default_logger.update( fmt=format, level=level, lname = log_filename )
    if os.path.isabs( log_filename ):
        bdir, log_filename = os.path.split( log_filename )
        default_logger.update(pdir='', ldir = bdir, lname = log_filename )
    logger = _get_logger( **default_logger )

if __name__ == "__main__":

    freeze_support()

    parser = argparse.ArgumentParser( description="CLI." )
    group = parser.add_mutually_exclusive_group()
    group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                        channel")
    args = parser.parse_args()

    logFile = 'mylogfile.txt'
    init_log(logFile)

    bucket_name = 'some-bucket'

    if args.verbose:
        try:
            print_handler = logging.StreamHandler( stdout )
            print_handler.setLevel( logging.DEBUG )
            formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s' )
            print_handler.setFormatter( formatter )
            logger.addHandler( print_handler )
        except (NoOptionError, NoSectionError) as e:
            logger.exception( e )

    logger.info('Establishing Connection')
    _upload_part('large_testfile.log', bucket_name)

1条回答
时光不老,我们不散
2楼-- · 2019-06-08 10:27

The StreamHandler isn't working in the children because you're only setting it up in the parent process. You need to do all your logging setup inside of init_log for it to take effect in the children:

# ... This stuff is the same...    

def _upload_part(argFile, argBucket, verbose, **core_chunk):  # Add verbose argument
    #... Same until you declare the Pool
    pool = Pool(processes=1, initializer=init_log, initargs=(logFile, verbose))  # Add verbose to initargs
   # All the same ...

def init_log(logFile, verbose):
    global logger
    exec_file = os.path.abspath( argv[0] )
    exec_dir = os.path.dirname( exec_file )
    default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                           fmt='%(asctime)s %(levelname)s: %(message)s' )

    log_filename = logFile
    level = 'INFO'
    format = '%(asctime)s %(levelname)s: %(message)s'

    default_logger.update( fmt=format, level=level, lname = log_filename )
    if os.path.isabs( log_filename ):
        bdir, log_filename = os.path.split( log_filename )
        default_logger.update(pdir='', ldir = bdir, lname = log_filename )
    logger = _get_logger( **default_logger )

    if verbose:  # Set up StreamHandler here
        try:
            print_handler = logging.StreamHandler(stdout)
            print_handler.setLevel(logging.DEBUG)
            formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
            print_handler.setFormatter(formatter)
            logger.addHandler(print_handler)
        except (NoOptionError, NoSectionError) as e:
            logger.exception(e)

if __name__ == "__main__":

    freeze_support()

    parser = argparse.ArgumentParser( description="CLI." )
    group = parser.add_mutually_exclusive_group()
    group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                        channel")
    args = parser.parse_args()

    logFile = 'mylogfile.txt'
    init_log(logFile)

    bucket_name = 'some-bucket'

    logger.info('Establishing Connection')
    _upload_part('large_testfile.log', bucket_name, args.verbose)  # Pass args.verbose
查看更多
登录 后发表回答