我正在为tf.data.Dataset
编写一个from_indexable
,https://github.com/tensorflow/tensorflow/issues/14448
from_indexable
的优点是可以并行处理,而Python生成器不能并行处理。
from_indexable
函数创建一个tf.data.range
,将可索引对象包装在通用的tf.py_func
中,并调用map函数。
对于那些现在想使用from_indexable
的人,这里是库代码:
import tensorflow as tf
import numpy as np
from tensorflow.python.framework import tensor_shape
from tensorflow.python.util import nest
def py_func_decorator(output_types=None, output_shapes=None, stateful=True, name=None):
def decorator(func):
def call(*args):
nonlocal output_shapes
flat_output_types = nest.flatten(output_types)
flat_values = tf.py_func(
func,
inp=args,
Tout=flat_output_types,
stateful=stateful, name=name
)
if output_shapes is not None:
output_shapes = nest.map_structure_up_to(
output_types, tensor_shape.as_shape, output_shapes)
flattened_shapes = nest.flatten_up_to(output_types, output_shapes)
for ret_t, shape in zip(flat_values, flattened_shapes):
ret_t.set_shape(shape)
return nest.pack_sequence_as(output_types, flat_values)
return call
return decorator
def from_indexable(iterator, output_types, output_shapes=None, num_parallel_calls=None, stateful=True, name=None):
ds = tf.data.Dataset.range(len(iterator))
@py_func_decorator(output_types, output_shapes, stateful=stateful, name=name)
def index_to_entry(index):
return iterator[index]
return ds.map(index_to_entry, num_parallel_calls=num_parallel_calls)
这里有一个例子(注意:from_indexable
有一个num_parallel_calls
参数)
class PyDataSet:
def __len__(self):
return 20
def __getitem__(self, item):
return np.random.normal(size=(item+1, 10))
ds = from_indexable(PyDataSet(), output_types=tf.float64, output_shapes=[None, 10])
it = ds.make_one_shot_iterator()
entry = it.get_next()
with tf.Session() as sess:
print(sess.run(entry).shape)
print(sess.run(entry).shape)
更新 2018年6月10日:
自https://github.com/tensorflow/tensorflow/pull/15121合并后,from_indexable
的代码变得更简单:
import tensorflow as tf
def py_func_decorator(output_types=None, output_shapes=None, stateful=True, name=None):
def decorator(func):
def call(*args, **kwargs):
return tf.contrib.framework.py_func(
func=func,
args=args, kwargs=kwargs,
output_types=output_types, output_shapes=output_shapes,
stateful=stateful, name=name
)
return call
return decorator
def from_indexable(iterator, output_types, output_shapes=None, num_parallel_calls=None, stateful=True, name=None):
ds = tf.data.Dataset.range(len(iterator))
@py_func_decorator(output_types, output_shapes, stateful=stateful, name=name)
def index_to_entry(index):
return iterator[index]
return ds.map(index_to_entry, num_parallel_calls=num_parallel_calls)
tf.py_func()
实现并行化可能不会提高速度,请参考此答案。 - mikkolafrom_generator
中是否添加了num_parallel_calls
参数? - Rylan Schaeffer