We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?
I have my rdd of numpy array as:
features_rdd
options I tried:-
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
option 2:-
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Error:-
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
imports:-
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above
updates:-
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......{}'.format(e.args))
return []
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession \
.builder \
.appName('test-app') \
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)