Memory limit hit with appengine-mapreduce

2020-02-26 10:26发布

I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.

However, as I run this on Google App Engine, I encountered this error message in the logs section:

Exceeded soft private memory limit with 180.56 MB after servicing 130 requests total While handling this request, the process that handled this request was found to be using too much memory and was terminated. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application.

def time_count_map(data):
  """Time count map function."""
  (entry, text_fn) = data
  text = text_fn()

  try:
    q = text.split('\n')
    for m in q:
        reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
  except IndexError, e:
    logging.debug(e)


def time_count_reduce(key, values):
  """Time count reduce function."""
  time = 0.0
  for subtime in values:
    time += float(subtime)
    realtime = int(time)
  yield "%s: %d\n" % (key, realtime)

Can anyone suggest how else I can optimize my code better? Thanks!!

Edited:

Here's the pipeline handler:

class TimeCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Time count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "time_count",
        "main.time_count_map",
        "main.time_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=32)
    yield StoreOutput("TimeCount", filekey, output)

Mapreduce.yaml:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4
- name: Make messages upper case
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.upper_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4

The rest of the files are exactly the same as the demo.

I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

2条回答
时光不老,我们不散
2楼-- · 2020-02-26 10:55

It is likely your input file exceeds the soft memory limit in size. For big files use either BlobstoreLineInputReader or BlobstoreZipLineInputReader.

These input readers pass something different to the map function, they pass the start_position in the file and the line of text.

Your map function might look something like:

def time_count_map(data):
    """Time count map function."""
    text = data[1]

    try:
        reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
    except IndexError, e:
        logging.debug(e)

Using BlobstoreLineInputReader will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.

查看更多
ら.Afraid
3楼-- · 2020-02-26 11:00

Also consider calling gc.collect() at regular points during your code. I've seen several SO questions about exceeding soft memory limits that were alleviated by calling gc.collect(), most having to do with blobstore.

查看更多
登录 后发表回答