tensorpack.dataflow package¶
-
class
tensorpack.dataflow.
DataFlow
[source]¶ Bases:
object
Base class for all DataFlow
-
get_data
()[source]¶ The method to generate datapoints.
Yields: list – The datapoint, i.e. list of components.
-
reset_state
()[source]¶ Reset state of the dataflow. It has to be called before producing datapoints.
For example, RNG has to be reset if used in the DataFlow, otherwise it won’t work well with prefetching, because different processes will have the same RNG state.
-
size
()[source]¶ Returns: int – size of this data flow. Raises: NotImplementedError
if this DataFlow doesn’t have a size.
-
-
class
tensorpack.dataflow.
ProxyDataFlow
(ds)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Base class for DataFlow that proxies another. Every method is proxied to
self.ds
unless override by subclass.
-
class
tensorpack.dataflow.
RNGDataFlow
[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
A DataFlow with RNG
-
exception
tensorpack.dataflow.
DataFlowTerminated
[source]¶ Bases:
BaseException
An exception indicating that the DataFlow is unable to produce any more data, i.e. something wrong happened so that calling
get_data()
cannot give a valid iterator any more. In most DataFlow this will never be raised.
-
class
tensorpack.dataflow.
TestDataSpeed
(ds, size=5000, warmup=0)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Test the speed of some DataFlow
-
class
tensorpack.dataflow.
PrintData
(ds, num=1, label=None, name=None, max_depth=3, max_list=3)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Behave like an identity mapping, but print shape and range of the first few datapoints.
Example
To enable this debugging output, you should place it somewhere in your dataflow like
def get_data(): ds = SomeDataSource('path/to/lmdb') ds = SomeInscrutableMappings(ds) ds = PrintData(ds, num=2, max_list=2) return ds ds = get_data()
The output looks like:
[0110 09:22:21 @common.py:589] DataFlow Info: datapoint 0<2 with 4 components consists of 0: float with value 0.0816501893251 1: ndarray:int32 of shape (64,) in range [0, 10] 2: ndarray:float32 of shape (64, 64) in range [-1.2248, 1.2177] 3: list of len 50 0: ndarray:int32 of shape (64, 64) in range [-128, 80] 1: ndarray:float32 of shape (64, 64) in range [0.8400, 0.6845] ... datapoint 1<2 with 4 components consists of 0: float with value 5.88252075399 1: ndarray:int32 of shape (64,) in range [0, 10] 2: ndarray:float32 of shape (64, 64) with range [-0.9011, 0.8491] 3: list of len 50 0: ndarray:int32 of shape (64, 64) in range [-70, 50] 1: ndarray:float32 of shape (64, 64) in range [0.7400, 0.3545] ...
-
class
tensorpack.dataflow.
BatchData
(ds, batch_size, remainder=False, use_list=False)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Stack datapoints into batches. It produces datapoints of the same number of components as
ds
, but each component has one new extra dimension of sizebatch_size
. The batch can be either a list of original components, or (by default) a numpy array of original components.-
__init__
(ds, batch_size, remainder=False, use_list=False)[source]¶ Parameters: ds (DataFlow) – When
use_list=False
, the components ofds
must be either scalars ornp.ndarray
, and have to be consistent in shapes.batch_size (int) – batch size
remainder (bool) – When the remaining datapoints in
ds
is not enough to form a batch, whether or not to also produce the remaining data as a smaller batch. If set to False, all produced datapoints are guranteed to have the same batch size. If set to True, ds.size() must be accurate.use_list (bool) – if True, each component will contain a list of datapoints instead of an numpy array of an extra dimension.
-
-
class
tensorpack.dataflow.
BatchDataByShape
(ds, batch_size, idx)[source]¶ Bases:
tensorpack.dataflow.common.BatchData
Group datapoints of the same shape together to batches. It doesn’t require input DataFlow to be homogeneous anymore: it can have datapoints of different shape, and batches will be formed from those who have the same shape.
Note
It is implemented by a dict{shape -> datapoints}. Datapoints of uncommon shapes may never be enough to form a batch and never get generated.
-
class
tensorpack.dataflow.
FixedSizeData
(ds, size, keep_state=True)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Generate data from another DataFlow, but with a fixed total count.
-
class
tensorpack.dataflow.
MapData
(ds, func)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Apply a mapper/filter on the DataFlow.
Note
Please make sure func doesn’t modify the components unless you’re certain it’s safe.
If you discard some datapoints,
ds.size()
will be incorrect.
-
class
tensorpack.dataflow.
MapDataComponent
(ds, func, index=0)[source]¶ Bases:
tensorpack.dataflow.common.MapData
Apply a mapper/filter on a datapoint component.
Note
This dataflow itself doesn’t modify the datapoints. But please make sure func doesn’t modify the components unless you’re certain it’s safe.
If you discard some datapoints,
ds.size()
will be incorrect.
-
class
tensorpack.dataflow.
RepeatedData
(ds, nr)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Take data points from another DataFlow and produce them until it’s exhausted for certain amount of times. i.e.: dp1, dp2, …. dpn, dp1, dp2, ….dpn
-
size
()[source]¶ Raises: ValueError
when nr == -1.
-
-
class
tensorpack.dataflow.
RepeatedDataPoint
(ds, nr)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Take data points from another DataFlow and produce them a certain number of times. i.e.: dp1, dp1, …, dp1, dp2, …, dp2, …
-
class
tensorpack.dataflow.
RandomChooseData
(df_lists)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Randomly choose from several DataFlow. Stop producing when any of them is exhausted.
-
class
tensorpack.dataflow.
RandomMixData
(df_lists)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Perfectly mix datapoints from several DataFlow using their
size()
. Will stop when all DataFlow exhausted.
-
class
tensorpack.dataflow.
JoinData
(df_lists)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Join the components from each DataFlow.
Examples:
df1 produces: [c1, c2] df2 produces: [c3, c4] joined: [c1, c2, c3, c4]
-
class
tensorpack.dataflow.
ConcatData
(df_lists)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Concatenate several DataFlow. Produce datapoints from each DataFlow and go to the next when one DataFlow is exhausted.
-
tensorpack.dataflow.
SelectComponent
(ds, idxs)[source]¶ Select / reorder components from datapoints.
Parameters: Example:
original df produces: [c1, c2, c3] idxs: [2,1] this df: [c3, c2]
-
class
tensorpack.dataflow.
LocallyShuffleData
(ds, buffer_size, nr_reuse=1, shuffle_interval=None)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
,tensorpack.dataflow.base.RNGDataFlow
Maintain a pool to buffer datapoints, and shuffle before producing them. This can be used as an alternative when a complete random read is too expensive or impossible for the data source.
-
__init__
(ds, buffer_size, nr_reuse=1, shuffle_interval=None)[source]¶ Parameters: ds (DataFlow) – input DataFlow.
buffer_size (int) – size of the buffer.
nr_reuse (int) – reuse each datapoints several times to improve speed, but may hurt your model.
shuffle_interval (int) – shuffle the buffer after this many datapoints went through it. Frequent shuffle on large buffer may affect speed, but infrequent shuffle may affect randomness. Defaults to buffer_size / 3
-
-
class
tensorpack.dataflow.
CacheData
(ds, shuffle=False)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Cache the first pass of a DataFlow completely in memory, and produce from the cache thereafter.
- NOTE: The user should not stop the iterator before it has reached the end.
- Otherwise the cache may be incomplete.
-
class
tensorpack.dataflow.
HDF5Data
(filename, data_paths, shuffle=True)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Zip data from different paths in an HDF5 file.
Warning
The current implementation will load all data into memory. (TODO)
-
class
tensorpack.dataflow.
LMDBData
(lmdb_path, shuffle=True, keys=None)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Read a LMDB database and produce (k,v) raw bytes pairs. The raw bytes are usually not what you’re interested in. You might want to use
LMDBDataDecoder
,LMDBDataPoint
, or apply a mapper function afterLMDBData
.-
__init__
(lmdb_path, shuffle=True, keys=None)[source]¶ Parameters: lmdb_path (str) – a directory or a file.
shuffle (bool) – shuffle the keys or not.
list of str as the keys, used only when shuffle is True. It can also be a format string e.g.
{:0>8d}
which will be formatted with the indices from 0 to total_size - 1.If not provided, it will then look in the database for
__keys__
whichdump_dataflow_to_lmdb()
used to store the list of keys. If still not found, it will iterate over the database to find all the keys.
-
-
class
tensorpack.dataflow.
LMDBDataDecoder
(lmdb_data, decoder)[source]¶ Bases:
tensorpack.dataflow.common.MapData
Read a LMDB database and produce a decoded output.
-
class
tensorpack.dataflow.
LMDBDataPoint
(*args, **kwargs)[source]¶ Bases:
tensorpack.dataflow.common.MapData
Read a LMDB file and produce deserialized datapoints. It only accepts the database produced by
tensorpack.dataflow.dftools.dump_dataflow_to_lmdb()
, which usestensorpack.utils.serialize.dumps()
for serialization.Example
ds = LMDBDataPoint("/data/ImageNet.lmdb", shuffle=False) # read and decode # The above is equivalent to: ds = LMDBData("/data/ImageNet.lmdb", shuffle=False) # read ds = LMDBDataPoint(ds) # decode # Sometimes it makes sense to separate reading and decoding # to be able to make decoding parallel.
-
tensorpack.dataflow.
CaffeLMDB
(lmdb_path, shuffle=True, keys=None)[source]¶ Read a Caffe LMDB file where each value contains a
caffe.Datum
protobuf. Produces datapoints of the format: [HWC image, label].Note that Caffe LMDB format is not efficient: it stores serialized raw arrays rather than JPEG images.
Parameters: shuffle, keys (lmdb_path,) – same as LMDBData
.Returns: a LMDBDataDecoder
instance.Example
ds = CaffeLMDB("/tmp/validation", keys='{:0>8d}')
-
class
tensorpack.dataflow.
SVMLightData
(filename, shuffle=True)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Read X,y from a svmlight file, and produce [X_i, y_i] pairs.
-
class
tensorpack.dataflow.
TFRecordData
(path, size=None)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Produce datapoints from a TFRecord file, assuming each record is serialized by
serialize.dumps()
. This class works withdftools.dump_dataflow_to_tfrecord()
.
-
class
tensorpack.dataflow.
ImageFromFile
(files, channel=3, resize=None, shuffle=False)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Produce images read from a list of files.
-
class
tensorpack.dataflow.
AugmentImageComponent
(ds, augmentors, index=0, copy=True, catch_exceptions=False)[source]¶ Bases:
tensorpack.dataflow.common.MapDataComponent
Apply image augmentors on 1 image component.
-
__init__
(ds, augmentors, index=0, copy=True, catch_exceptions=False)[source]¶ Parameters: ds (DataFlow) – input DataFlow.
augmentors (AugmentorList) – a list of
imgaug.ImageAugmentor
to be applied in order.index (int) – the index of the image component to be augmented in the datapoint.
copy (bool) – Some augmentors modify the input images. When copy is True, a copy will be made before any augmentors are applied, to keep the original images not modified. Turn it off to save time when you know it’s OK.
catch_exceptions (bool) – when set to True, will catch all exceptions and only warn you when there are too many (>100). Can be used to ignore occasion errors in data.
-
-
class
tensorpack.dataflow.
AugmentImageCoordinates
(ds, augmentors, img_index=0, coords_index=1, copy=True, catch_exceptions=False)[source]¶ Bases:
tensorpack.dataflow.common.MapData
Apply image augmentors on an image and a list of coordinates. Coordinates must be a Nx2 floating point array, each row is (x, y).
-
__init__
(ds, augmentors, img_index=0, coords_index=1, copy=True, catch_exceptions=False)[source]¶ Parameters: ds (DataFlow) – input DataFlow.
augmentors (AugmentorList) – a list of
imgaug.ImageAugmentor
to be applied in order.img_index (int) – the index of the image component to be augmented.
coords_index (int) – the index of the coordinate component to be augmented.
catch_exceptions (copy,) – same as in
AugmentImageComponent
-
-
class
tensorpack.dataflow.
AugmentImageComponents
(ds, augmentors, index=(0, 1), coords_index=(), copy=True, catch_exceptions=False)[source]¶ Bases:
tensorpack.dataflow.common.MapData
Apply image augmentors on several components, with shared augmentation parameters.
Example
ds = MyDataFlow() # produce [image(HWC), segmask(HW), keypoint(Nx2)] ds = AugmentImageComponents( ds, augs, index=(0,1), coords_index=(2,))
-
__init__
(ds, augmentors, index=(0, 1), coords_index=(), copy=True, catch_exceptions=False)[source]¶ Parameters: ds (DataFlow) – input DataFlow.
augmentors (AugmentorList) – a list of
imgaug.ImageAugmentor
instance to be applied in order.index – tuple of indices of the image components.
coords_index – tuple of indices of the coordinates components.
catch_exceptions (copy,) – same as in
AugmentImageComponent
-
-
tensorpack.dataflow.
PrefetchData
¶ alias of
MultiProcessPrefetchData
-
class
tensorpack.dataflow.
MultiProcessPrefetchData
(ds, nr_prefetch, nr_proc)[source]¶ Bases:
tensorpack.dataflow.base.ProxyDataFlow
Prefetch data from a DataFlow using Python multiprocessing utilities. It will fork the process calling
__init__()
, collect datapoints from ds in each process by a Pythonmultiprocessing.Queue
.Note
An iterator cannot run faster automatically – what’s happenning is that the underlying dataflow will be forked
nr_proc
times. As a result, we have the following guarantee on the dataflow correctness:When
nr_proc=1
, the dataflow produces the same data asds
in the same order.When
nr_proc>1
, the dataflow produces the same distribution of data asds
if each sample fromds
is i.i.d. (e.g. fully shuffled). You probably only want to use it for training.
This has more serialization overhead than
PrefetchDataZMQ
when data is large.You can nest like this:
PrefetchDataZMQ(PrefetchData(df, nr_proc=a), nr_proc=b)
. A total ofa
instances ofdf
worker processes will be created.fork happens in __init__. reset_state() is a no-op. The worker processes won’t get called.
-
class
tensorpack.dataflow.
PrefetchDataZMQ
(ds, nr_proc=1, hwm=50)[source]¶ Bases:
tensorpack.dataflow.parallel._MultiProcessZMQDataFlow
Prefetch data from a DataFlow using multiple processes, with ZeroMQ for communication. It will fork the calling process of
reset_state()
, and collect datapoints from the given dataflow in each process by ZeroMQ IPC pipe.Note
An iterator cannot run faster automatically – what’s happenning is that the underlying dataflow will be forked
nr_proc
times. As a result, we have the following guarantee on the dataflow correctness:- When
nr_proc=1
, this dataflow produces the same data as the given dataflow in the same order.
- When
- When
nr_proc>1
, if each sample from the given dataflow is i.i.d. (e.g. fully shuffled), then this dataflow produces the same distribution of data as the given dataflow. This implies that there will be duplication, reordering, etc. You probably only want to use it for training. If the samples are not i.i.d., the behavior is undefined.
- When
reset_state() of the given dataflow will be called once and only once in the worker processes.
The fork of processes happened in this dataflow’s reset_state() method. Please note that forking a TensorFlow GPU session may be unsafe. If you’re managing this dataflow on your own, it’s better to fork before creating the session.
After the fork has happened, this dataflow becomes not fork-safe. i.e., if you fork an already reset instance of this dataflow, it won’t be usable in the forked process.
Do not nest two PrefetchDataZMQ.
By default, a UNIX named pipe will be created in the current directory. However, certain non-local filesystem such as NFS/GlusterFS/AFS doesn’t always support pipes. You can change the directory by
export TENSORPACK_PIPEDIR=/other/dir
. In particular, you can use somewhere under ‘/tmp’ which is usually local.Note that some non-local FS may appear to support pipes and code may appear to run but crash with bizarre error. Also note that ZMQ limits the maximum length of pipe path. If you hit the limit, you can set the directory to a softlink which points to a local directory.
-
class
tensorpack.dataflow.
MultiThreadPrefetchData
(get_df, nr_prefetch, nr_thread)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Create multiple dataflow instances and run them each in one thread. Collect outputs with a queue.
-
tensorpack.dataflow.
ThreadedMapData
¶ alias of
MultiThreadMapData
-
class
tensorpack.dataflow.
MultiThreadMapData
(ds, nr_thread, map_func, buffer_size=200, strict=False)[source]¶ Bases:
tensorpack.dataflow.parallel_map._ParallelMapData
Same as
MapData
, but start threads to run the mapping function. This is useful when the mapping function is the bottleneck, but you don’t want to start processes for the entire dataflow pipeline.Note
There is tiny communication overhead with threads, but you should avoid starting many threads in your main process to reduce GIL contention.
The threads will only start in the process which calls
reset_state()
. Therefore you can usePrefetchDataZMQ(MultiThreadMapData(...), 1)
to reduce GIL contention.Threads run in parallel and can take different time to run the mapping function. Therefore the order of datapoints won’t be preserved, and datapoints from one pass of df.get_data() might get mixed with datapoints from the next pass.
You can use strict mode, where MultiThreadMapData.get_data() is guranteed to produce the exact set which df.get_data() produces. Although the order of data still isn’t preserved.
-
tensorpack.dataflow.
MultiProcessMapData
¶ alias of
MultiProcessMapDataZMQ
-
class
tensorpack.dataflow.
MultiProcessMapDataZMQ
(ds, nr_proc, map_func, buffer_size=200, strict=False)[source]¶ Bases:
tensorpack.dataflow.parallel_map._ParallelMapData
,tensorpack.dataflow.parallel._MultiProcessZMQDataFlow
Same as
MapData
, but start processes to run the mapping function, and communicate with ZeroMQ pipe.Note
Processes run in parallel and can take different time to run the mapping function. Therefore the order of datapoints won’t be preserved, and datapoints from one pass of df.get_data() might get mixed with datapoints from the next pass.
You can use strict mode, where MultiProcessMapData.get_data() is guranteed to produce the exact set which df.get_data() produces. Although the order of data still isn’t preserved.
-
class
tensorpack.dataflow.
FakeData
(shapes, size=1000, random=True, dtype='float32', domain=(0, 1))[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Generate fake data of given shapes
-
__init__
(shapes, size=1000, random=True, dtype='float32', domain=(0, 1))[source]¶ Parameters: shapes (list) – a list of lists/tuples. Shapes of each component.
size (int) – size of this DataFlow.
random (bool) – whether to randomly generate data every iteration. Note that merely generating the data could sometimes be time-consuming!
dtype (str or list) – data type as string, or a list of data types.
domain (tuple or list) – (min, max) tuple, or a list of such tuples
-
-
class
tensorpack.dataflow.
DataFromQueue
(queue)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Produce data from a queue
-
class
tensorpack.dataflow.
DataFromList
(lst, shuffle=True)[source]¶ Bases:
tensorpack.dataflow.base.RNGDataFlow
Wrap a list of datapoints to a DataFlow
-
class
tensorpack.dataflow.
DataFromGenerator
(gen, size=None)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Wrap a generator to a DataFlow.
-
class
tensorpack.dataflow.
DataFromIterable
(iterable)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Wrap an iterable of datapoitns to a DataFlow
-
tensorpack.dataflow.
send_dataflow_zmq
(df, addr, hwm=50, format=None, bind=False)[source]¶ Run DataFlow and send data to a ZMQ socket addr. It will __connect__ to this addr, serialize and send each datapoint to this addr with a PUSH socket. This function never returns unless an error is encountered.
Parameters: df (DataFlow) – Will infinitely loop over the DataFlow.
addr – a ZMQ socket endpoint.
hwm (int) – ZMQ high-water mark (buffer size)
format (str) – The serialization format. Default format would use
tensorpack.utils.serialize
. An alternate format is ‘zmq_ops’, used by https://github.com/tensorpack/zmq_ops.bind (bool) – whether to bind or connect to the endpoint.
-
class
tensorpack.dataflow.
RemoteDataZMQ
(addr1, addr2=None, hwm=50, bind=True)[source]¶ Bases:
tensorpack.dataflow.base.DataFlow
Produce data from ZMQ PULL socket(s). See http://tensorpack.readthedocs.io/en/latest/tutorial/efficient-dataflow.html#distributed-dataflow
-
cnt1, cnt2
int – number of data points received from addr1 and addr2
-
tensorpack.dataflow.dftools module¶
-
tensorpack.dataflow.dftools.
dump_dataflow_to_process_queue
(df, size, nr_consumer)[source]¶ Convert a DataFlow to a
multiprocessing.Queue
. The DataFlow will only be reset in the spawned process.Parameters: Returns: tuple(queue, process) – The process will take data from
df
and fill the queue, once you start it. Each element in the queue is (idx, dp). idx can be theDIE
sentinel whendf
is exhausted.
-
tensorpack.dataflow.dftools.
dump_dataflow_to_lmdb
(df, lmdb_path, write_frequency=5000)[source]¶ Dump a Dataflow to a lmdb database, where the keys are indices and values are serialized datapoints. The output database can be read directly by
tensorpack.dataflow.LMDBDataPoint
.Parameters: