I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.
However, as I run this on Google App Engine, I encountered this error message in the logs section:
Exceeded soft private memory limit with 180.56 MB after servicing 130 requests total While handling this request, the process that handled this request was found to be using too much memory and was terminated. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Can anyone suggest how else I can optimize my code better? Thanks!!
Edited:
Here's the pipeline handler:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
The rest of the files are exactly the same as the demo.
I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
It is likely your input file exceeds the soft memory limit in size. For big files use either
BlobstoreLineInputReader
orBlobstoreZipLineInputReader
.These input readers pass something different to the
map
function, they pass thestart_position
in the file and the line of text.Your
map
function might look something like:Using
BlobstoreLineInputReader
will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.Also consider calling gc.collect() at regular points during your code. I've seen several SO questions about exceeding soft memory limits that were alleviated by calling gc.collect(), most having to do with blobstore.