tensorpack.dataflow package

Module contents

class tensorpack.dataflow.DataFlow[source]

Bases: object

Base class for all DataFlow

get_data()[source]

The method to generate datapoints.

Yields:list – The datapoint, i.e. list of components.
reset_state()[source]

Reset state of the dataflow. It has to be called before producing datapoints.

For example, RNG has to be reset if used in the DataFlow, otherwise it won’t work well with prefetching, because different processes will have the same RNG state.

size()[source]
Returns:int – size of this data flow.
Raises:NotImplementedError if this DataFlow doesn’t have a size.
class tensorpack.dataflow.ProxyDataFlow(ds)[source]

Bases: tensorpack.dataflow.base.DataFlow

Base class for DataFlow that proxies another

__init__(ds)[source]
Parameters:ds (DataFlow) – DataFlow to proxy.
get_data()[source]
reset_state()[source]

Reset state of the proxied DataFlow.

size()[source]
class tensorpack.dataflow.RNGDataFlow[source]

Bases: tensorpack.dataflow.base.DataFlow

A DataFlow with RNG

reset_state()[source]

Reset the RNG

class tensorpack.dataflow.TestDataSpeed(ds, size=5000)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Test the speed of some DataFlow

__init__(ds, size=5000)[source]
Parameters:
  • ds (DataFlow) – the DataFlow to test.

  • size (int) – number of datapoints to fetch.

get_data()[source]

Will start testing at the beginning, then produce data normally.

start()[source]

Alias of start_test.

start_test()[source]

Start testing with a progress bar.

class tensorpack.dataflow.PrintData(ds, num=1, label=None, name=None)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Behave like an identity mapping but print shape and range of the first datapoint once during construction.

label

str – label to identify the data when using this debugging on multiple places.

num

int – number of iterations

Example

To enable this debugging output, you should place it somewhere in your dataflow like

def get_data():
    ds = CaffeLMDB('path/to/lmdb')
    ds = SomeInscrutableMappings(ds)
    ds = PrintData(ds, num=2)
    return ds
ds = get_data()

The output looks like:

[0110 09:22:21 @common.py:589] DataFlow Info:
datapoint 0<2 with 4 components consists of
   dp 0: is float of shape () with range [0.0816501893251]
   dp 1: is ndarray of shape (64, 64) with range [0.1300, 0.6895]
   dp 2: is ndarray of shape (64, 64) with range [-1.2248, 1.2177]
   dp 3: is ndarray of shape (9, 9) with range [-0.6045, 0.6045]
datapoint 1<2 with 4 components consists of
   dp 0: is float of shape () with range [5.88252075399]
   dp 1: is ndarray of shape (64, 64) with range [0.0072, 0.9371]
   dp 2: is ndarray of shape (64, 64) with range [-0.9011, 0.8491]
   dp 3: is ndarray of shape (9, 9) with range [-0.5585, 0.5585]
__init__(ds, num=1, label=None, name=None)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • num (int) – number of dataflow points to print.

  • label (str, optional) – label to identify this call, when using multiple times

print_info()[source]

Dump gathered debugging information to stdout.

reset_state()[source]
class tensorpack.dataflow.BatchData(ds, batch_size, remainder=False, use_list=False)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Concat datapoints into batches. It produces datapoints of the same number of components as ds, but each component has one new extra dimension of size batch_size. The new component can be a list of the original datapoints, or an ndarray of the original datapoints.

__init__(ds, batch_size, remainder=False, use_list=False)[source]
Parameters:
  • ds (DataFlow) – Its components must be either scalars or np.ndarray. Each component has to be of the same shape across datapoints.

  • batch_size (int) – batch size

  • remainder (bool) – whether to return the remaining data smaller than a batch_size. If set True, it will possibly generates a data point of a smaller batch size. Otherwise, all generated data are guranteed to have the same size.

  • use_list (bool) – if True, it will run faster by producing a list of datapoints instead of an ndarray of datapoints, avoiding an extra copy.

get_data()[source]
Yields:Batched data by stacking each component on an extra 0th dimension.
size()[source]
class tensorpack.dataflow.BatchDataByShape(ds, batch_size, idx)[source]

Bases: tensorpack.dataflow.common.BatchData

Group datapoints of the same shape together to batches. It doesn’t require input DataFlow to be homogeneous anymore: it can have datapoints of different shape, and batches will be formed from those who have the same shape.

It is implemented by a dict{shape -> datapoints}. Datapoints of uncommon shapes may never be enough to form a batch and never get generated.

__init__(ds, batch_size, idx)[source]
Parameters:
  • ds (DataFlow) – input DataFlow. dp[idx] has to be an np.ndarray.

  • batch_size (int) – batch size

  • idx (int) – dp[idx].shape will be used to group datapoints. Other components are assumed to have the same shape.

get_data()[source]
reset_state()[source]
class tensorpack.dataflow.FixedSizeData(ds, size)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Generate data from another DataFlow, but with a fixed size. The iterator of the underlying DataFlow will be kept if not exhausted.

__init__(ds, size)[source]
Parameters:
get_data()[source]
size()[source]
class tensorpack.dataflow.MapData(ds, func)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Apply a mapper/filter on the DataFlow

__init__(ds, func)[source]
Parameters:
  • ds (DataFlow) – input DataFlow

  • func (datapoint -> datapoint | None) – takes a datapoint and returns a new datapoint. Return None to discard this data point. Note that if you use the filter feature, ds.size() will be incorrect.

Note

Please make sure func doesn’t modify the components unless you’re certain it’s safe.

get_data()[source]
class tensorpack.dataflow.MapDataComponent(ds, func, index=0)[source]

Bases: tensorpack.dataflow.common.MapData

Apply a mapper/filter on a datapoint component

__init__(ds, func, index=0)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • func (TYPE -> TYPE|None) – takes dp[index], returns a new value for dp[index]. return None to discard this datapoint. Note that if you use the filter feature, ds.size() will be incorrect.

  • index (int) – index of the component.

Note

This proxy itself doesn’t modify the datapoints. But please make sure func doesn’t modify the components unless you’re certain it’s safe.

class tensorpack.dataflow.RepeatedData(ds, nr)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Take data points from another DataFlow and produce them until it’s exhausted for certain amount of times.

__init__(ds, nr)[source]
Parameters:
  • ds (DataFlow) – input DataFlow

  • nr (int) – number of times to repeat ds. Set to -1 to repeat ds infinite times.

get_data()[source]
size()[source]
Raises:ValueError when nr == -1.
class tensorpack.dataflow.RepeatedDataPoint(ds, nr)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Take data points from another DataFlow and produce them a certain number of times dp1, ..., dp1, dp2, ..., dp2, ...

__init__(ds, nr)[source]
Parameters:
  • ds (DataFlow) – input DataFlow

  • nr (int) – number of times to repeat each datapoint.

get_data()[source]
size()[source]
class tensorpack.dataflow.RandomChooseData(df_lists)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Randomly choose from several DataFlow. Stop producing when any of them is exhausted.

__init__(df_lists)[source]
Parameters:df_lists (list) – a list of DataFlow, or a list of (DataFlow, probability) tuples. Probabilities must sum to 1 if used.
get_data()[source]
reset_state()[source]
class tensorpack.dataflow.RandomMixData(df_lists)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Perfectly mix datapoints from several DataFlow. Will stop when all DataFlow exhausted.

__init__(df_lists)[source]
Parameters:df_lists (list) – a list of DataFlow. All DataFlow must implement size().
get_data()[source]
reset_state()[source]
size()[source]
class tensorpack.dataflow.JoinData(df_lists)[source]

Bases: tensorpack.dataflow.base.DataFlow

Join the components from each DataFlow.

Examples:

dp1: [c1, c2]
dp2: [c3, c4]
join: [c1, c2, c3, c4]
__init__(df_lists)[source]
Parameters:df_lists (list) – a list of DataFlow. When these dataflows have different sizes, JoinData will stop when any of them is exhausted.
get_data()[source]
reset_state()[source]
size()[source]
class tensorpack.dataflow.ConcatData(df_lists)[source]

Bases: tensorpack.dataflow.base.DataFlow

Concatenate several dataflows. Produce datapoints from them one by one.

__init__(df_lists)[source]
Parameters:df_lists (list) – a list of DataFlow.
get_data()[source]
reset_state()[source]
size()[source]
tensorpack.dataflow.SelectComponent(ds, idxs)[source]

Select / reorder components from datapoints.

Parameters:
  • ds (DataFlow) – input DataFlow.

  • idxs (list[int]) – a list of component indices.

Example:

dp: [c1, c2, c3]
idxs: [2,1]
output dp: [c3, c2]
class tensorpack.dataflow.LocallyShuffleData(ds, buffer_size, nr_reuse=1, shuffle_interval=None)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow, tensorpack.dataflow.base.RNGDataFlow

Maintain a pool to buffer datapoints, and shuffle before producing them. This can be used as an alternative when a complete random read is too expensive or impossible for the data source.

__init__(ds, buffer_size, nr_reuse=1, shuffle_interval=None)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • buffer_size (int) – size of the buffer.

  • nr_reuse (int) – reuse each datapoints several times to improve speed, but may hurt your model.

  • shuffle_interval (int) – shuffle the buffer after this many datapoints went through it. Frequent shuffle on large buffer may affect speed, but infrequent shuffle may affect randomness. Defaults to buffer_size / 3

get_data()[source]
reset_state()[source]
class tensorpack.dataflow.CacheData(ds, shuffle=False)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Cache a dataflow completely in memory.

__init__(ds, shuffle=False)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • shuffle (bool) – whether to shuffle the datapoints before producing them.

get_data()[source]
reset_state()[source]
class tensorpack.dataflow.HDF5Data(filename, data_paths, shuffle=True)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Zip data from different paths in an HDF5 file.

Warning

The current implementation will load all data into memory.

__init__(filename, data_paths, shuffle=True)[source]
Parameters:
  • filename (str) – h5 data file.

  • data_paths (list) – list of h5 paths to zipped. For example [‘images’, ‘labels’].

  • shuffle (bool) – shuffle all data.

get_data()[source]
size()[source]
class tensorpack.dataflow.LMDBData(lmdb_path, shuffle=True, keys=None)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Read a LMDB database and produce (k,v) pairs

__init__(lmdb_path, shuffle=True, keys=None)[source]
Parameters:
  • lmdb_path (str) – a directory or a file.

  • shuffle (bool) – shuffle the keys or not.

  • keys (list[str] or str) –

    list of str as the keys, used only when shuffle is True. It can also be a format string e.g. {:0>8d} which will be formatted with the indices from 0 to total_size - 1.

    If not provided, it will then look in the database for __keys__ which dump_dataflow_to_lmdb() used to store the list of keys. If still not found, it will iterate over the database to find all the keys.

get_data()[source]
open_lmdb()[source]
reset_state()[source]
size()[source]
class tensorpack.dataflow.LMDBDataDecoder(lmdb_data, decoder)[source]

Bases: tensorpack.dataflow.common.MapData

Read a LMDB database and produce a decoded output.

__init__(lmdb_data, decoder)[source]
Parameters:
  • lmdb_data – a LMDBData instance.

  • decoder (k,v -> dp | None) – a function taking k, v and returning a datapoint, or return None to discard.

class tensorpack.dataflow.LMDBDataPoint(*args, **kwargs)[source]

Bases: tensorpack.dataflow.common.MapData

Read a LMDB file and produce deserialized datapoints. It reads the database produced by tensorpack.dataflow.dftools.dump_dataflow_to_lmdb().

Example

ds = LMDBDataPoint("/data/ImageNet.lmdb", shuffle=False)

# alternatively:
ds = LMDBData("/data/ImageNet.lmdb", shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = LMDBDataPoint(ds)
__init__(*args, **kwargs)[source]
Parameters:kwargs (args,) – Same as in LMDBData.
tensorpack.dataflow.CaffeLMDB(lmdb_path, shuffle=True, keys=None)[source]

Read a Caffe LMDB file where each value contains a caffe.Datum protobuf. Produces datapoints of the format: [HWC image, label].

Note that Caffe LMDB format is not efficient: it stores serialized raw arrays rather than JPEG images.

Parameters:shuffle, keys (lmdb_path,) – same as LMDBData.
Returns:a LMDBDataDecoder instance.

Example

ds = CaffeLMDB("/tmp/validation", keys='{:0>8d}')
class tensorpack.dataflow.SVMLightData(filename, shuffle=True)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Read X,y from a svmlight file, and produce [X_i, y_i] pairs.

__init__(filename, shuffle=True)[source]
Parameters:
  • filename (str) – input file

  • shuffle (bool) – shuffle the data

get_data()[source]
size()[source]
class tensorpack.dataflow.TFRecordData(path, size=None)[source]

Bases: tensorpack.dataflow.base.DataFlow

Produce datapoints from a TFRecord file, assuming each record is serialized by serialize.dumps(). This class works with dftools.dump_dataflow_to_tfrecord().

__init__(path, size=None)[source]
Parameters:
  • path (str) – path to the tfrecord file

  • size (int) – total number of records, because this metadata is not stored in the tfrecord file.

get_data()[source]
size()[source]
class tensorpack.dataflow.ImageFromFile(files, channel=3, resize=None, shuffle=False)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Produce images read from a list of files.

__init__(files, channel=3, resize=None, shuffle=False)[source]
Parameters:
  • files (list) – list of file paths.

  • channel (int) – 1 or 3. Will convert grayscale to RGB images if channel==3.

  • resize (tuple) – int or (h, w) tuple. If given, resize the image.

get_data()[source]
size()[source]
class tensorpack.dataflow.AugmentImageComponent(ds, augmentors, index=0, copy=True)[source]

Bases: tensorpack.dataflow.common.MapDataComponent

Apply image augmentors on 1 component.

__init__(ds, augmentors, index=0, copy=True)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • augmentors (AugmentorList) – a list of imgaug.ImageAugmentor to be applied in order.

  • index (int) – the index of the image component to be augmented.

  • copy (bool) – Some augmentors modify the input images. When copy is True, a copy will be made before any augmentors are applied, to keep the original images not modified. Turn it off to save time when you know it’s OK.

reset_state()[source]
class tensorpack.dataflow.AugmentImageCoordinates(ds, augmentors, img_index=0, coords_index=1, copy=True)[source]

Bases: tensorpack.dataflow.common.MapData

Apply image augmentors on an image and set of coordinates.

__init__(ds, augmentors, img_index=0, coords_index=1, copy=True)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • augmentors (AugmentorList) – a list of imgaug.ImageAugmentor to be applied in order.

  • img_index (int) – the index of the image component to be augmented.

  • coords_index (int) – the index of the coordinate component to be augmented.

  • copy (bool) – Some augmentors modify the input images. When copy is True, a copy will be made before any augmentors are applied, to keep the original images not modified. Turn it off to save time when you know it’s OK.

reset_state()[source]
class tensorpack.dataflow.AugmentImageComponents(ds, augmentors, index=(0, 1), copy=True)[source]

Bases: tensorpack.dataflow.common.MapData

Apply image augmentors on several components, with shared augmentation parameters.

__init__(ds, augmentors, index=(0, 1), copy=True)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • augmentors (AugmentorList) – a list of imgaug.ImageAugmentor instance to be applied in order.

  • index – tuple of indices of components.

  • copy (bool) – Some augmentors modify the input images. When copy is True, a copy will be made before any augmentors are applied, to keep the original images not modified. Turn it off to save time when you know it’s OK.

reset_state()[source]
class tensorpack.dataflow.PrefetchData(ds, nr_prefetch, nr_proc=1)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Prefetch data from a DataFlow using Python multiprocessing utilities.

Note

This is significantly slower than PrefetchDataZMQ when data is large.

__init__(ds, nr_prefetch, nr_proc=1)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • nr_prefetch (int) – size of the queue to hold prefetched datapoints.

  • nr_proc (int) – number of processes to use.

get_data()[source]
reset_state()[source]
class tensorpack.dataflow.PrefetchDataZMQ(ds, nr_proc=1, hwm=50)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Prefetch data from a DataFlow using multiple processes, with ZMQ for communication.

A local directory is needed to put the ZMQ pipes. You can set this with env var $TENSORPACK_PIPEDIR if you’re running on non-local FS such as NFS or GlusterFS.

Note that this dataflow is not fork-safe. You cannot nest this dataflow into another PrefetchDataZMQ or PrefetchData.

__init__(ds, nr_proc=1, hwm=50)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • nr_proc (int) – number of processes to use.

  • hwm (int) – the zmq “high-water mark” for both sender and receiver.

get_data()[source]
reset_state()[source]
start_processes()[source]
class tensorpack.dataflow.PrefetchOnGPUs(ds, gpus, pipedir=None)[source]

Bases: tensorpack.dataflow.prefetch.PrefetchDataZMQ

Prefetch with each process having its own CUDA_VISIBLE_DEVICES variable mapped to one GPU.

__init__(ds, gpus, pipedir=None)[source]
Parameters:
  • ds (DataFlow) – input DataFlow.

  • gpus (list[int]) – list of GPUs to use. Will also start this many of processes.

  • pipedir (str) – a local directory where the pipes should be put. Useful if you’re running on non-local FS such as NFS or GlusterFS.

start_processes()[source]
class tensorpack.dataflow.ThreadedMapData(ds, nr_thread, map_func, buffer_size=200)[source]

Bases: tensorpack.dataflow.base.ProxyDataFlow

Same as MapData, but start threads to run the mapping function. This is useful when the mapping function is the bottleneck, but you don’t want to start processes for the entire dataflow pipeline.

With threads, there are tiny communication overhead, but due to GIL, you should avoid starting the threads in your main process. Note that the threads will only start in the process which calls reset_state().

__init__(ds, nr_thread, map_func, buffer_size=200)[source]
Parameters:
  • ds (DataFlow) – the dataflow to map

  • nr_thread (int) – number of threads to use

  • map_func (callable) – datapoint -> datapoint | None

  • buffer_size (int) – number of datapoints in the buffer

get_data()[source]
reset_state()[source]
class tensorpack.dataflow.FakeData(shapes, size=1000, random=True, dtype='float32', domain=(0, 1))[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Generate fake data of given shapes

__init__(shapes, size=1000, random=True, dtype='float32', domain=(0, 1))[source]
Parameters:
  • shapes (list) – a list of lists/tuples. Shapes of each component.

  • size (int) – size of this DataFlow.

  • random (bool) – whether to randomly generate data every iteration. Note that merely generating the data could sometimes be time-consuming!

  • dtype (str or list) – data type as string, or a list of data types.

  • domain (tuple or list) – (min, max) tuple, or a list of such tuples

get_data()[source]
size()[source]
class tensorpack.dataflow.DataFromQueue(queue)[source]

Bases: tensorpack.dataflow.base.DataFlow

Produce data from a queue

__init__(queue)[source]
Parameters:queue (queue) – a queue with get() method.
get_data()[source]
class tensorpack.dataflow.DataFromList(lst, shuffle=True)[source]

Bases: tensorpack.dataflow.base.RNGDataFlow

Produce data from a list

__init__(lst, shuffle=True)[source]
Parameters:
  • lst (list) – input list.

  • shuffle (bool) – shuffle data.

get_data()[source]
size()[source]
class tensorpack.dataflow.DataFromGenerator(gen, size=None)[source]

Bases: tensorpack.dataflow.base.DataFlow

Wrap a generator to a DataFlow

get_data()[source]
size()[source]
tensorpack.dataflow.send_dataflow_zmq(df, addr, hwm=50, print_interval=100, format=None)[source]

Run DataFlow and send data to a ZMQ socket addr. It will dump and send each datapoint to this addr with a PUSH socket.

Parameters:
  • df (DataFlow) – Will infinitely loop over the DataFlow.

  • addr – a ZMQ socket addr.

  • hwm (int) – high water mark

class tensorpack.dataflow.RemoteDataZMQ(addr1, addr2=None)[source]

Bases: tensorpack.dataflow.base.DataFlow

Produce data from ZMQ PULL socket(s).

cnt1, cnt2

int – number of data points received from addr1 and addr2

__init__(addr1, addr2=None)[source]
Parameters:addr1,addr2 (str) – addr of the socket to connect to. Use both if you need two protocols (e.g. both IPC and TCP). I don’t think you’ll ever need 3.
get_data()[source]
reset_state()[source]

tensorpack.dataflow.dftools module

tensorpack.dataflow.dftools.dump_dataflow_images(df, dirname, max_count=None, index=0)[source]

Dump images from a DataFlow to a directory.

Parameters:
  • df (DataFlow) – the DataFlow to dump.

  • dirname (str) – name of the directory.

  • max_count (int) – limit max number of images to dump. Defaults to unlimited.

  • index (int) – the index of the image component in the data point.

tensorpack.dataflow.dftools.dump_dataflow_to_process_queue(df, size, nr_consumer)[source]

Convert a DataFlow to a multiprocessing.Queue. The DataFlow will only be reset in the spawned process.

Parameters:
  • df (DataFlow) – the DataFlow to dump.

  • size (int) – size of the queue

  • nr_consumer (int) – number of consumer of the queue. The producer will add this many of DIE sentinel to the end of the queue.

Returns:

tuple(queue, process) – The process will take data from df and fill the queue, once you start it. Each element in the queue is (idx, dp). idx can be the DIE sentinel when df is exhausted.

tensorpack.dataflow.dftools.dump_dataflow_to_lmdb(df, lmdb_path, write_frequency=5000)[source]

Dump a Dataflow to a lmdb database, where the keys are indices and values are serialized datapoints. The output database can be read directly by tensorpack.dataflow.LMDBDataPoint.

Parameters:
  • df (DataFlow) – the DataFlow to dump.

  • lmdb_path (str) – output path. Either a directory or a mdb file.

  • write_frequency (int) – the frequency to write back data to disk.

tensorpack.dataflow.dftools.dump_dataflow_to_tfrecord(df, path)[source]

Dump all datapoints of a Dataflow to a TensorFlow TFRecord file, using serialize.dumps() to serialize.

Parameters:
  • df (DataFlow) –

  • path (str) – the output file path