can't pickle _thread.RLock objects when runnin

2019-07-27 11:56发布

问题:

I am trying to do a hyper parameter tuning with the tune package of Ray.

Shown below is my code:

# Disable linter warnings to maintain consistency with tutorial.
# pylint: disable=invalid-name
# pylint: disable=g-bad-import-order


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import matplotlib as mplt
mplt.use('agg')  # Must be before importing matplotlib.pyplot or pylab!
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from math import sqrt
import csv

import argparse
import sys
import tempfile
import pandas as pd
import time

import ray
from ray.tune import grid_search, run_experiments, register_trainable

from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

import tensorflow as tf

import ray
from ray import tune



class RNNconfig():

    num_steps = 14
    lstm_size = 32
    batch_size = 8
    init_learning_rate = 0.01
    learning_rate_decay = 0.99
    init_epoch = 5  # 5
    max_epoch = 60  # 100 or 50
    hidden1_nodes = 30
    hidden2_nodes = 15
    hidden1_activation = tf.nn.relu
    hidden2_activation = tf.nn.relu
    lstm_activation = tf.nn.relu
    status_reporter = None
    FLAGS = None

    input_size = 1
    num_layers = 1
    fileName = 'store2_1.csv'
    graph = tf.Graph()
    column_min_max = [[0,11000], [1,7]]
    columns = ['Sales', 'DayOfWeek','SchoolHoliday', 'Promo']
    features = len(columns)

rnn_config = RNNconfig()


def segmentation(data):

    seq = [price for tup in data[rnn_config.columns].values for price in tup]

    seq = np.array(seq)

    # split into items of features
    seq = [np.array(seq[i * rnn_config.features: (i + 1) * rnn_config.features])
           for i in range(len(seq) // rnn_config.features)]

    # split into groups of num_steps
    X = np.array([seq[i: i + rnn_config.num_steps] for i in range(len(seq) -  rnn_config.num_steps)])

    y = np.array([seq[i +  rnn_config.num_steps] for i in range(len(seq) -  rnn_config.num_steps)])

    # get only sales value
    y = [[y[i][0]] for i in range(len(y))]

    y = np.asarray(y)

    return X, y

def scale(data):

    for i in range (len(rnn_config.column_min_max)):
        data[rnn_config.columns[i]] = (data[rnn_config.columns[i]] - rnn_config.column_min_max[i][0]) / ((rnn_config.column_min_max[i][1]) - (rnn_config.column_min_max[i][0]))

    return data

def rescle(test_pred):

    prediction = [(pred * (rnn_config.column_min_max[0][1] - rnn_config.column_min_max[0][0])) + rnn_config.column_min_max[0][0] for pred in test_pred]

    return prediction


def pre_process():
    store_data = pd.read_csv(rnn_config.fileName)

    store_data = store_data.drop(store_data[(store_data.Open == 0) & (store_data.Sales == 0)].index)
    #
    # store_data = store_data.drop(store_data[(store_data.Open != 0) & (store_data.Sales == 0)].index)

    # ---for segmenting original data --------------------------------
    original_data = store_data.copy()

    ## train_size = int(len(store_data) * (1.0 - rnn_config.test_ratio))



    validation_len = len(store_data[(store_data.Month == 6) & (store_data.Year == 2015)].index)
    test_len = len(store_data[(store_data.Month == 7) & (store_data.Year == 2015)].index)
    train_size = int(len(store_data) -  (validation_len+test_len))

    train_data = store_data[:train_size]
    validation_data = store_data[(train_size-rnn_config.num_steps): validation_len+train_size]
    test_data = store_data[((validation_len+train_size) - rnn_config.num_steps): ]
    original_val_data = validation_data.copy()
    original_test_data = test_data.copy()


    # -------------- processing train data---------------------------------------
    scaled_train_data = scale(train_data)
    train_X, train_y = segmentation(scaled_train_data)

    # -------------- processing validation data---------------------------------------
    scaled_validation_data = scale(validation_data)
    val_X, val_y = segmentation(scaled_validation_data)


    # -------------- processing test data---------------------------------------
    scaled_test_data = scale(test_data)
    test_X, test_y = segmentation(scaled_test_data)

    # ----segmenting original validation data-----------------------------------------------
    nonescaled_val_X, nonescaled_val_y = segmentation(original_val_data)


    # ----segmenting original test data-----------------------------------------------
    nonescaled_test_X, nonescaled_test_y = segmentation(original_test_data)



    return train_X, train_y, test_X, test_y, val_X, val_y, nonescaled_test_y,nonescaled_val_y


def generate_batches(train_X, train_y, batch_size):
    num_batches = int(len(train_X)) // batch_size
    if batch_size * num_batches < len(train_X):
        num_batches += 1

    batch_indices = range(num_batches)
    for j in batch_indices:
        batch_X = train_X[j * batch_size: (j + 1) * batch_size]
        batch_y = train_y[j * batch_size: (j + 1) * batch_size]
        assert set(map(len, batch_X)) == {rnn_config.num_steps}
        yield batch_X, batch_y

def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    itemindex = np.where(y_true == 0)
    y_true = np.delete(y_true, itemindex)
    y_pred = np.delete(y_pred, itemindex)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def RMSPE(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.sqrt(np.mean(np.square(((y_true - y_pred) / y_pred)), axis=0))



def deepnn(inputs):

    cell = tf.contrib.rnn.LSTMCell(rnn_config.lstm_size, state_is_tuple=True, activation= rnn_config.lstm_activation)

    val1, _ = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32)

    val = tf.transpose(val1, [1, 0, 2])

    last = tf.gather(val, int(val.get_shape()[0]) - 1, name="last_lstm_output")

    # hidden layer
    hidden1 = tf.layers.dense(last, units=rnn_config.hidden1_nodes, activation=rnn_config.hidden2_activation)
    hidden2 = tf.layers.dense(hidden1, units=rnn_config.hidden2_nodes, activation=rnn_config.hidden1_activation)

    weight = tf.Variable(tf.truncated_normal([rnn_config.hidden2_nodes, rnn_config.input_size]))
    bias = tf.Variable(tf.constant(0.1, shape=[rnn_config.input_size]))

    prediction = tf.matmul(hidden2, weight) + bias

    return prediction


def main():



    train_X, train_y, test_X, test_y, val_X, val_y, nonescaled_test_y, nonescaled_val_y = pre_process()





    # Create the model
    inputs = tf.placeholder(tf.float32, [None, rnn_config.num_steps, rnn_config.features], name="inputs")
    targets = tf.placeholder(tf.float32, [None, rnn_config.input_size], name="targets")
    learning_rate = tf.placeholder(tf.float32, None, name="learning_rate")

    # Build the graph for the deep net
    prediction = deepnn(inputs)

    with tf.name_scope('loss'):
        model_loss = tf.losses.mean_squared_error(targets, prediction)

    with tf.name_scope('adam_optimizer'):
        optimizer = tf.train.AdamOptimizer(learning_rate).minimize(model_loss)
        # train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)


    graph_location = "checkpoints_sales/sales_pred.ckpt"
    # graph_location = tempfile.mkdtemp()
    print('Saving graph to: %s' % graph_location)
    train_writer = tf.summary.FileWriter(graph_location)
    train_writer.add_graph(tf.get_default_graph())

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())

        learning_rates_to_use = [
            rnn_config.init_learning_rate * (
                    rnn_config.learning_rate_decay ** max(float(i + 1 -rnn_config.init_epoch), 0.0)
            ) for i in range(rnn_config.max_epoch)]

        for epoch_step in range(rnn_config.max_epoch):

            current_lr = learning_rates_to_use[epoch_step]

            i = 0

            for batch_X, batch_y in generate_batches(train_X, train_y, rnn_config.batch_size):

                train_data_feed = {
                    inputs: batch_X,
                    targets: batch_y,
                    learning_rate: current_lr,
                }

                train_loss, _ = sess.run([model_loss, optimizer], train_data_feed)

                if i % 10 == 0:
                    val_data_feed = {
                        inputs: val_X,
                        targets: val_y,
                        learning_rate: 0.0,
                    }

                    val_prediction = prediction.eval(feed_dict=val_data_feed)

                    meanSquaredError = mean_squared_error(val_y, val_prediction)
                    val_rootMeanSquaredError = sqrt(meanSquaredError)
                    print('epoch %d, step %d, training accuracy %g' % (i, epoch_step, val_rootMeanSquaredError))

                    if rnn_config.status_reporter:
                        rnn_config.status_reporter(timesteps_total= epoch_step, mean_accuracy= val_rootMeanSquaredError)

                i += 1

        test_data_feed = {
            inputs: test_X,
            targets: test_y,
            learning_rate: 0.0,
        }

        test_prediction = prediction.eval(feed_dict=val_data_feed)
        meanSquaredError = mean_squared_error(val_y, test_prediction)
        test_rootMeanSquaredError = sqrt(meanSquaredError)
        print('training accuracy %g' % (test_rootMeanSquaredError))


# !!! Entrypoint for ray.tune !!!
def train(config, reporter=None):

    rnn_config.status_reporter = reporter
    rnn_config.num_steps= getattr(config["num_steps"])
    rnn_config.lstm_size = getattr(config["lstm_size"])
    rnn_config.hidden1_nodes = getattr(config["hidden1_nodes"])
    rnn_config.hidden2_nodes = getattr(config["hidden2_nodees"])
    rnn_config.lstm_activation = getattr(tf.nn, config["lstm_activation"])
    rnn_config.init_learning_rate = getattr(config["learning_rate"])
    rnn_config.hidden1_activation = getattr(tf.nn, config['hidden1_activation'])
    rnn_config.hidden2_activation = getattr(tf.nn, config['hidden2_activation'])
    rnn_config.learning_rate_decay = getattr(config["learning_rate_decay"])
    rnn_config.max_epoch = getattr(config["max_epoch"])
    rnn_config.init_epoch = getattr(config["init_epoch"])
    rnn_config.batch_size = getattr(config["batch_size"])

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_dir',
        type=str,
        default='/tmp/tensorflow/mnist/input_data',
        help='Directory for storing input data')
    rnn_config.FLAGS, unparsed = parser.parse_known_args()
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)


# !!! Example of using the ray.tune Python API !!!
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--smoke-test', action='store_true', help='Finish quickly for testing')
    args, _ = parser.parse_known_args()

    register_trainable('train_mnist', train)
    mnist_spec = {
        'run': 'train_mnist',
        'stop': {
            'mean_accuracy': 0.99,
        },
        'config': {
             "num_steps": tune.grid_search([1, 2, 3,4,5,6,7,8,9,10,11,12,13,14,15]),
            "lstm_size": tune.grid_search([8,16,32,64,128]),
            "hidden1_nodes" : tune.grid_search([4,8,16,32,64]),
            "hidden2_nodees" : tune.grid_search([2,4,8,16,32]),
            "lstm_activation" : tune.grid_search(['relu', 'elu', 'tanh']),
            "learning_rate" : tune.grid_search([0.01,0.1,0.5,0.05]),
            "hidden1_activation" : tune.grid_search(['relu', 'elu', 'tanh']),
            "hidden2_activation" : tune.grid_search(['relu', 'elu', 'tanh']),
            "learning_rate_decay" : tune.grid_search([0.99,0.8,0.7]),
            "max_epoch" : tune.grid_search([60,50,100,120,200]),
            "init_epoch" : tune.grid_search([5,10,15,20]),
            "batch_size" : tune.grid_search([5,8,16,32,64])
        },
    }

    if args.smoke_test:
        mnist_spec['stop']['training_iteration'] = 2

    ray.init()
    run_experiments({'tune_mnist_test': mnist_spec})

When I try to run this, I am getting an eror. Shown below is the stack trace. This is my first time using Tune so I am not sure what I'm doing wrong here. Also note that the example algorithm given by ray works fine on my machine.

/home/suleka/anaconda3/lib/python3.6/site-packages/h5py/init.py:36: FutureWarning: Conversion of the second argument of issubdtype from float to np.floating is deprecated. In future, it will be treated as np.float64 == np.dtype(float).type. from ._conv import register_converters as _register_converters WARNING: Not updating worker name since setproctitle is not installed. Install this with pip install setproctitle (or ray[debug]) to enable monitoring of worker processes. Traceback (most recent call last): File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 918, in save_global obj2, parent = _getattribute(module, name) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 266, in _getattribute .format(name, obj)) AttributeError: Can't get local attribute 'wrap_function..WrappedFunc' on

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 639, in save_global return Pickler.save_global(self, obj, name=name) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 922, in save_global (obj, module_name, name)) _pickle.PicklingError: Can't pickle .WrappedFunc'>: it's not found as ray.tune.trainable.wrap_function..WrappedFunc

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/suleka/Documents/sales_prediction/auto_LSTM_withoutZero.py", line 322, in register_trainable('train_mnist', train) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/tune/registry.py", line 38, in register_trainable _global_registry.register(TRAINABLE_CLASS, name, trainable) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/tune/registry.py", line 77, in register self._to_flush[(category, key)] = pickle.dumps(value) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 881, in dumps cp.dump(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 268, in dump return Pickler.dump(self, obj) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 648, in save_global return self.save_dynamic_class(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 495, in save_dynamic_class save(clsdict) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 410, in save_function self.save_function_tuple(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 553, in save_function_tuple save(state) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 808, in _batch_appends save(tmp[0]) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 405, in save_function self.save_function_tuple(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 553, in save_function_tuple save(state) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 405, in save_function self.save_function_tuple(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 553, in save_function_tuple save(state) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 405, in save_function self.save_function_tuple(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 553, in save_function_tuple save(state) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 605, in save_reduce save(cls) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 636, in save_global return self.save_dynamic_class(obj) File "/home/suleka/anaconda3/lib/python3.6/site-packages/ray/cloudpickle/cloudpickle.py", line 495, in save_dynamic_class save(clsdict) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 634, in save_reduce save(state) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/suleka/anaconda3/lib/python3.6/pickle.py", line 496, in save rv = reduce(self.proto) TypeError: can't pickle _thread.RLock objects

回答1:

You will have to call rnn_config = RNNconfig() in def train(config, reporter=None) function. Most importantly, the tf.Graph() needs to be initialized within train because it is not easily pickleable.

Note that the rest of your code may also need to be adjusted accordingly.