Source code for tensorpack.dataflow.serialize

# -*- coding: utf-8 -*-
# File: serialize.py

import numpy as np
import os
import platform
from collections import defaultdict

from ..utils import logger
from ..utils.serialize import dumps, loads
from ..utils.develop import create_dummy_class  # noqa
from ..utils.utils import get_tqdm
from .base import DataFlow
from .common import FixedSizeData, MapData
from .format import HDF5Data, LMDBData, DiskCacheData
from .raw import DataFromGenerator, DataFromList

__all__ = ['LMDBSerializer', 'NumpySerializer', 'TFRecordSerializer', 'HDF5Serializer']


def _reset_df_and_get_size(df):
    df.reset_state()
    try:
        sz = len(df)
    except NotImplementedError:
        sz = 0
    return sz


[docs]class LMDBSerializer(): """ Serialize a Dataflow to a lmdb database, where the keys are indices and values are serialized datapoints. You will need to ``pip install lmdb`` to use it. Example: .. code-block:: python LMDBSerializer.save(my_df, "output.lmdb") new_df = LMDBSerializer.load("output.lmdb", shuffle=True) """
[docs] @staticmethod def save(df, path, write_frequency=5000): """ Args: df (DataFlow): the DataFlow to serialize. path (str): output path. Either a directory or an lmdb file. write_frequency (int): the frequency to write back data to disk. A smaller value reduces memory usage. """ assert isinstance(df, DataFlow), type(df) isdir = os.path.isdir(path) if isdir: assert not os.path.isfile(os.path.join(path, 'data.mdb')), "LMDB file exists!" else: assert not os.path.isfile(path), "LMDB file {} exists!".format(path) # It's OK to use super large map_size on Linux, but not on other platforms # See: https://github.com/NVIDIA/DIGITS/issues/206 map_size = 1099511627776 * 2 if platform.system() == 'Linux' else 128 * 10**6 db = lmdb.open(path, subdir=isdir, map_size=map_size, readonly=False, meminit=False, map_async=True) # need sync() at the end size = _reset_df_and_get_size(df) # put data into lmdb, and doubling the size if full. # Ref: https://github.com/NVIDIA/DIGITS/pull/209/files def put_or_grow(txn, key, value): try: txn.put(key, value) return txn except lmdb.MapFullError: pass txn.abort() curr_size = db.info()['map_size'] new_size = curr_size * 2 logger.info("Doubling LMDB map_size to {:.2f}GB".format(new_size / 10**9)) db.set_mapsize(new_size) txn = db.begin(write=True) txn = put_or_grow(txn, key, value) return txn with get_tqdm(total=size) as pbar: idx = -1 # LMDB transaction is not exception-safe! # although it has a context manager interface txn = db.begin(write=True) for idx, dp in enumerate(df): txn = put_or_grow(txn, u'{:08}'.format(idx).encode('ascii'), dumps(dp)) pbar.update() if (idx + 1) % write_frequency == 0: txn.commit() txn = db.begin(write=True) txn.commit() keys = [u'{:08}'.format(k).encode('ascii') for k in range(idx + 1)] with db.begin(write=True) as txn: txn = put_or_grow(txn, b'__keys__', dumps(keys)) logger.info("Flushing database ...") db.sync() db.close()
[docs] @staticmethod def load(path, shuffle=True): """ Note: If you found deserialization being the bottleneck, you can use :class:`LMDBData` as the reader and run deserialization as a mapper in parallel. """ df = LMDBData(path, shuffle=shuffle) return MapData(df, LMDBSerializer._deserialize_lmdb)
@staticmethod def _deserialize_lmdb(dp): return loads(dp[1])
class DiskCacheSerializer(): """ Serialize a Dataflow to a diskcache database, where the keys are indices and values are serialized datapoints. Example: .. code-block:: python DiskCacheSerializer.save(my_df, "output.diskcache") new_df = DiskCacheSerializer.load("output.diskcache", shuffle=True) """ @staticmethod def save(df, path, write_frequency=5000): """ Args: df (DataFlow): the DataFlow to serialize. path (str): output path. Either a directory or an lmdb file. """ assert isinstance(df, DataFlow), type(df) assert not os.path.isfile(path), "DiskCache path {} must be a directory!".format(path) db = diskcache.Index(path) size = _reset_df_and_get_size(df) with get_tqdm(total=size) as pbar: idx = -1 for idx, dp in enumerate(df): db[idx] = dumps(dp) pbar.update() @staticmethod def load(path, shuffle=True): """ Note: If you found deserialization being the bottleneck, you can use :class:`LMDBData` as the reader and run deserialization as a mapper in parallel. """ df = DiskCacheData(path, shuffle=shuffle) return MapData(df, loads)
[docs]class NumpySerializer(): """ Serialize the entire dataflow to a npz dict. Note that this would have to store the entire dataflow in memory, and is also >10x slower than LMDB/TFRecord serializers. """
[docs] @staticmethod def save(df, path): """ Args: df (DataFlow): the DataFlow to serialize. path (str): output npz file. """ buffer = [] size = _reset_df_and_get_size(df) with get_tqdm(total=size) as pbar: for dp in df: buffer.append(dp) pbar.update() np.savez_compressed(path, buffer=np.asarray(buffer, dtype=np.object))
[docs] @staticmethod def load(path, shuffle=True): # allow_pickle defaults to False since numpy 1.16.3 # (https://www.numpy.org/devdocs/release.html#unpickling-while-loading-requires-explicit-opt-in) buffer = np.load(path, allow_pickle=True)['buffer'] return DataFromList(buffer, shuffle=shuffle)
[docs]class TFRecordSerializer(): """ Serialize datapoints to bytes (by tensorpack's default serializer) and write to a TFRecord file. Note that TFRecord does not support random access and is in fact not very performant. It's better to use :class:`LMDBSerializer`. """
[docs] @staticmethod def save(df, path): """ Args: df (DataFlow): the DataFlow to serialize. path (str): output tfrecord file. """ size = _reset_df_and_get_size(df) with tf.python_io.TFRecordWriter(path) as writer, get_tqdm(total=size) as pbar: for dp in df: writer.write(dumps(dp)) pbar.update()
[docs] @staticmethod def load(path, size=None): """ Args: size (int): total number of records. If not provided, the returned dataflow will have no `__len__()`. It's needed because this metadata is not stored in the TFRecord file. """ gen = tf.python_io.tf_record_iterator(path) ds = DataFromGenerator(gen) ds = MapData(ds, loads) if size is not None: ds = FixedSizeData(ds, size) return ds
[docs]class HDF5Serializer(): """ Write datapoints to a HDF5 file. Note that HDF5 files are in fact not very performant and currently do not support lazy loading. It's better to use :class:`LMDBSerializer`. """
[docs] @staticmethod def save(df, path, data_paths): """ Args: df (DataFlow): the DataFlow to serialize. path (str): output hdf5 file. data_paths (list[str]): list of h5 paths. It should have the same length as each datapoint, and each path should correspond to one component of the datapoint. """ size = _reset_df_and_get_size(df) buffer = defaultdict(list) with get_tqdm(total=size) as pbar: for dp in df: assert len(dp) == len(data_paths), "Datapoint has {} components!".format(len(dp)) for k, el in zip(data_paths, dp): buffer[k].append(el) pbar.update() with h5py.File(path, 'w') as hf, get_tqdm(total=len(data_paths)) as pbar: for data_path in data_paths: hf.create_dataset(data_path, data=buffer[data_path]) pbar.update()
[docs] @staticmethod def load(path, data_paths, shuffle=True): """ Args: data_paths (list): list of h5 paths to be zipped. """ return HDF5Data(path, data_paths, shuffle)
try: import lmdb except ImportError: LMDBSerializer = create_dummy_class('LMDBSerializer', 'lmdb') # noqa try: from tensorpack.compat import tfv1 as tf except ImportError: TFRecordSerializer = create_dummy_class('TFRecordSerializer', 'tensorflow') # noqa try: import h5py except ImportError: HDF5Serializer = create_dummy_class('HDF5Serializer', 'h5py') # noqa try: import diskcache except ImportError: DiskCacheSerializer = create_dummy_class('DiskCacheSerializer', 'diskcache') # noqa if __name__ == '__main__': from .raw import FakeData import time ds = FakeData([[300, 300, 3], [1]], 1000) print(time.time()) DiskCacheSerializer.save(ds, '/tmp/diskcache') print(time.time()) df = DiskCacheSerializer.load('/tmp/diskcache') df.reset_state() assert len(df) == 1000, len(df) for idx, dp in enumerate(df): pass print("DiskCache Finished, ", idx) print(time.time()) print(time.time()) TFRecordSerializer.save(ds, 'out.tfrecords') print(time.time()) df = TFRecordSerializer.load('out.tfrecords', size=1000) df.reset_state() for idx, dp in enumerate(df): pass print("TF Finished, ", idx) print(time.time()) LMDBSerializer.save(ds, 'out.lmdb') print(time.time()) df = LMDBSerializer.load('out.lmdb') df.reset_state() for idx, dp in enumerate(df): pass print("LMDB Finished, ", idx) print(time.time()) NumpySerializer.save(ds, 'out.npz') print(time.time()) df = NumpySerializer.load('out.npz') df.reset_state() for idx, dp in enumerate(df): pass print("Numpy Finished, ", idx) print(time.time()) paths = ['p1', 'p2'] HDF5Serializer.save(ds, 'out.h5', paths) print(time.time()) df = HDF5Serializer.load('out.h5', paths) df.reset_state() for idx, dp in enumerate(df): pass print("HDF5 Finished, ", idx) print(time.time())