This is a typical way of handling sparse features (such as some ID features) in recommendation system. I'm looking for a convenient way to prepare the data for TensorFlow pipeline.
I did lots of search, but yet find the good solution yet.
Below is the one which seems to be close to what I need, but not working yet.
See #######
part below
The data file is like:
csv = [
'1221,cc,1',
'213,aa|cc|ff,1',
]
for the second row, i need some SparseTensor like multi-hot
aa bb cc dd ee ff
| 0 0 1 0 0 0 |
| 1 0 1 0 0 1 |
The full version of code is:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import shutil
import sys
import tensorflow as tf # pylint: disable=g-bad-import-order
_CSV_COLUMNS = ['a_id', 'b_id', 'tags', 'label']
_CSV_COLUMN_DEFAULTS = [[0], [0], [''], [0]]
def input_fn(data_file, num_epochs, shuffle, batch_size):
"""Generate an input function for the Estimator."""
assert tf.gfile.Exists(data_file), (
'%s not found. Please make sure you have run data_download.py and '
'set the --data_dir argument to the correct path.' % data_file)
"""
$ cat vocab.txt
a
b
c
d
e
f
g
h
i
j
k
l
m
n
"""
table = tf.contrib.lookup.index_table_from_file(
vocabulary_file='vocab.txt', num_oov_buckets=1)
def parse_csv(value):
print('Parsing', data_file)
columns = tf.decode_csv(value, record_defaults=_CSV_COLUMN_DEFAULTS)
features = dict(zip(_CSV_COLUMNS, columns))
######################## BEGIN ###########################
# support multi-hot sparse features
split_tags = tf.string_split([columns[2]], '|') # hard-coded 'tags' column index
# Output: tags.indices Tensor("StringSplit:0", shape=(?, 2), dtype=int64)
print('tags.indices', split_tags.indices)
indice_idx = tf.map_fn(lambda x : x[0], split_tags.indices)
# Output: indice_idx Tensor("map/TensorArrayStack/TensorArrayGatherV3:0", shape=(?,), dtype=int64)
print('indice_idx', indice_idx)
value_idx = tf.map_fn(lambda x : x[1], split_tags.indices)
value_arr = tf.cast(tf.gather(split_tags.values, value_idx), tf.int64)
# Output: value_arr shape (?,)
print('value_arr shape', value_arr.shape)
# stack is doing: [1, 2, 3], [4, 5, 6] ==> [[1, 2], [3, 4], [5,6]]
new_indices = tf.stack([indice_idx, value_arr], axis=1)
print('new_indices', new_indices)
new_values = tf.ones_like(value_arr)
# Output: new_values Tensor("ones_like:0", shape=(?,), dtype=int64)
print('new_values', new_values)
with tf.Session() as s1:
s1.run([tf.global_variables_initializer(), tf.tables_initializer()])
##### FAIL here #####
# InvalidArgumentError (see above for traceback): You must feed a value for placeholder tensor 'arg0' with dtype string
# [[Node: arg0 = Placeholder[dtype=DT_STRING, shape=<unknown>, _device="/job:localhost/replica:0/task:0/device:CPU:0"]()]]
print(split_tags.values.eval())
print(indice_idx.eval())
print('value_arr', value_arr.eval())
print('new_values', new_values.eval())
categorial_tensor = tf.SparseTensor(
indices=new_indices,
values=new_values,
dense_shape=[new_indices.shape[1], 4])
######################## END ###########################
categorical_cols = {
'tags': categorial_tensor}
features.update(categorical_cols)
labels = features.pop('label')
return features, tf.equal(labels, 1)
# Extract lines from input files using the Dataset API.
dataset = tf.data.TextLineDataset(data_file)
if shuffle:
dataset = dataset.shuffle(buffer_size=6) # num of lines in the file
dataset = dataset.map(parse_csv, num_parallel_calls=5)
# We call repeat after shuffling, rather than before, to prevent separate
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
return dataset
"""
$ cat data.csv
1,2,a|c|g,1
0,1,c|f,0
0,2,b|g,1
0,1,b|v,0
0,1,g|j|k|l,1
0,1,a,0
"""
train_file = 'data.csv'
epochs_between_evals = 2
batch_size = 40
ds = input_fn(train_file, epochs_between_evals, True, batch_size)
with tf.Session() as s:
s.run([tf.global_variables_initializer(), tf.tables_initializer()])
print(s.run(ds))