jupyter_Note-DeepLearningDataPipeline
In [125]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn import datasets 
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
print('Tensorflow version:', tf.__version__)
Tensorflow version: 2.5.0

Summary

Blancing trainning speed and memory usage

When memory cannot load the entire dataset as well as compute cacluation, the input pipeline have to be delicately designed to avoid overloading the memory. The practical solution is to feed only a small batch in to the training model.

In this demo, I will be sharing my experience on

  • boosting execution performance using caching and parallel processing techniques at different step of a data pipeline.
  • optimaizing memory usage using suitable dataset directory structure and data pipelines

Summary of data pipeline workflow

Summary on dataset directory structure

Dataset Directory Structure case of usage data size preferable data type
Unsorted single data file with entire data mostly small structural data
Sorted by class multiple data files mostly large unstructual data
Sorted by train/test/val split and class isolate train/test/val data mostly large unstructual data

Summary on caching and parallel processing

Time expense come from two source:

  • Data preparation: come from waiting time during openning, reading, mapping and training.
  • parameter update: limited by conputational speed, which is depended on CPU and GPU.
Method on improving pipeline capacity Operation Traded-off resouce
for time expense
Waiting time saved
during data preparation
tf.Dataset.interleave(
num_parallel_calls = tf.data.AUTOTUNE)
Parallelizing data extraction CPU loop reading data
tf.Dataset.map(num_parallel_calls = tf.data.AUTOTUNE) Parallelizing Data Transformation CPU apply mapping of data processing
tf.Dataset.cache() Storing result in RAM in the first epoch RAM repeat same mapping of data processing in new epoch
tf.Dataset.prefetch(
buffer_size = tf.data.AUTOTUNE)
Preparing data for next epochs during current epochs CPU, RAM waiting time of reading and mapping before trainning

Summary of input API usage

Dataset
directory structure
Number of files API Input Special usage
unsort single tf.data.Dataset.from_tensors()
tf.data.Dataset.from_tensor_slices()
csv or filepath list numpy: undistinguished features
pandas: distinguished features
unsort multiple tf.data.TextLineDataset() csv/txt to quickly create dataset for text
unsort multiple tf.data.experimental.make_csv_dataset() filepath list To create a dataset with shuffle and batching
unsort multiple tf.data.Dataset.list_files() data folder directory To create a dataset of all files matching a pattern
sorted multiple tf.keras.preprocessing.image.ImageDataGenerator()
.flow_from_directory()
tf.keras.preprocessing.text_dataset_from_directory()
data folder directory To create a dataset, which the label are generated from sub-directory name, with data augmentation
sorted multiple tf.data.Dataset.from_generator() data generator To create a dataset with high customizability using python logic

Introduction

Dataset Directory Structure

Unsorted

.
├── data 
├── code.py
└── model

Sorted by class

.
├── data 
│     ├── label_1
│     ├── label_2
│     └── label_3
├── code.py
└── model

Sorted by train/test/val split and class

.
├── data 
│    ├── train
│    │   ├── label_1
│    │   ├── label_2
│    │   └── label_3
│    ├── test
│    │   ├── label_1
│    │   ├── label_2
│    │   └── label_3
│    └── validation
│        ├── label_1
│        ├── label_2
│        └── label_3
├── code.py
└── model

Input pipeline API

Class tf.data contain the following sub-class:

  • tf.data.TFRecordDataset: TFRecord is the data set storage format in TensorFlow. When we organize the data set into TFRecord format, TensorFlow can efficiently read and process these data sets, thus helping us to conduct large-scale model training more efficiently.
  • tf.data.TextLineDataset: Provides an easy way to extract lines from one or more text files. One line in the text is an element, which is a string type tensor
  • tf.data.Dataset: higher level warper for data input pipeline. A instance of this class can be seen as an iterated ordered list of "elements" of the same type.
    • tf.data.Dataset.from_tensors()
    • tf.data.Dataset.from_tensor_slices()
    • tf.data.Dataset.from_generator()
    • tf.data.Dataset.list_files()

class tf.keras.preprocessing contain three higher level ETL API tf.data.Datasets before they are fed to the model.

  • text_dataset_from_directory(): Generates a tf.data.Dataset from text files in a directory.
  • image_dataset_from_directory(): Generates a tf.data.Dataset from image files in a directory.
  • timeseries_dataset_from_array(): Creates a dataset of sliding windows over a timeseries provided as array. (Not included in this demo)

Sample dataset

To cover the examples of various combination of dataset directory structure, number of file, and type of data, I will be using the following open-source dataset.

Dataset
directory structure
Number of files Data type Dataset File type Number of sample
(train/test)
Number of Attributes
(Tensor shape)
Number of Class
unsorted single structural Iris Data Set .csv 150 (4,) 3
unsorted single timeseries House_Property_Sales .csv 29581 (3, ) numeric
unsorted single text Corporate_messaging_DFE .csv 3119 (9,) 4
unsorted Multiple audio free_spoken_digit_dataset .wav 3000 no identical length 10
sorted Multiple text stack_overflow_16k .txt 8000/8000 no identical length 4
sorted Multiple image MNIST database of handwritten digits .png 60,000/10,000 (28, 28, 1) 10

Example code

reference: 30天吃掉那只 TensorFlow2

From Unsorted single file

Comsuming Numpy array from Unsort single csv

In [156]:
df = pd.read_csv('dataset/Iris/Iris.csv').to_numpy()
X = df[:,1:5].astype("float32")
y = df[:,-1]

dataset = tf.data.Dataset.from_tensor_slices((X,y))
for features,label in dataset.take(1):
    print('Feature: \n',features, '\n')
    print('label: \n',label)
Feature: 
 tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float32) 

label: 
 tf.Tensor(b'Iris-setosa', shape=(), dtype=string)

Comsuming Pandas DataFrame from Unsort single csv

In [157]:
df = pd.read_csv('dataset/Iris/Iris.csv')
X = pd.DataFrame(df, columns = ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']).to_dict("list")
y = df[['Species']]

dataset = tf.data.Dataset.from_tensor_slices((X,y))
for features,label in dataset.take(1):
    print('Feature: \n',features, '\n')
    print('label: \n',label)
      
Feature: 
 {'SepalLengthCm': <tf.Tensor: shape=(), dtype=float32, numpy=5.1>, 'SepalWidthCm': <tf.Tensor: shape=(), dtype=float32, numpy=3.5>, 'PetalLengthCm': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'PetalWidthCm': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} 

label: 
 tf.Tensor([b'Iris-setosa'], shape=(1,), dtype=string)

From Unsort single/multiple file

Comsuming table from Unsort single/multiple csv

In [85]:
dataset = tf.data.experimental.make_csv_dataset(
    file_pattern = ["dataset/Iris/Iris.csv","dataset/Iris/Iris.csv"],
    select_columns = ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm', 'Species'],
    label_name="Species",
    batch_size=1, 
    na_value="",
    num_epochs=1,
    ignore_errors=True)

for features,label in dataset.take(1):
    print('Feature: \n',features, '\n')
    print('label: \n',label)
    
    
Feature: 
 OrderedDict([('SepalLengthCm', <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.5], dtype=float32)>), ('SepalWidthCm', <tf.Tensor: shape=(1,), dtype=float32, numpy=array([3.2], dtype=float32)>), ('PetalLengthCm', <tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.1], dtype=float32)>), ('PetalWidthCm', <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.], dtype=float32)>)]) 

label: 
 tf.Tensor([b'Iris-virginica'], shape=(1,), dtype=string)

Comsuming table from Unsort single/multiple txt/csv

In [150]:
dataset = tf.data.TextLineDataset(
    filenames = ["Dataset/Corporate_messaging_DFE/Corporate_messaging_DFE.csv"]
    ).skip(1) #略去第一行header

for line in dataset.take(1):
    print('line: \n',line, '\n')

    
line: 
 tf.Tensor(b'662822308,False,finalized,3,2/18/2015 4:31,Information,1.0,,4.37e+17,Barclays,Barclays CEO stresses the importance of regulatory and cultural reform in financial services at Brussels conference  http://t.co/Ge9Lp7hpyG', shape=(), dtype=string) 

From sorted multiple files

Consuming generator from Sorted multiple small text

In [288]:
dataset = tf.keras.preprocessing.text_dataset_from_directory(
    './Dataset/stack_overflow_16k/train',
    batch_size=1,
    validation_split=0.2,
    subset='training',
    seed=42)

for features,label in dataset.take(1):
    print('Feature: \n',features, '\n')
    print('label: \n',label)
Found 8000 files belonging to 4 classes.
Using 6400 files for training.
Feature: 
 tf.Tensor([b'"trying to convert a list of date(actrived from database) to format(yyyy-mm) public list&lt;date&gt; getdate(){.    @suppresswarnings(""unchecked"").    list&lt;string&gt; cr =          (list&lt;string&gt;)getsession().createcriteria(orderentity.class).            .setprojection(projections.projectionlist().                    .add(property.forname(""ordertime"")).                    ).            .list();.    system.out.println(""zxcv"");.    system.out.println(cr);//the output is here [2017-10-23 15:15:53.0, 2017-10-25 11:53:56.0, 2017-10-25 11:54:35.0}.    list&lt;date&gt; dates = new arraylist(cr.size());.    simpledateformat sdf = new simpledateformat(""yyyy-mm"");.    for (string datestring : cr) {.        dates.add(sdf.parse(datestring));.    }.    system.out.println(dates);.    return dates;.}...there is the error :...  blank.sql.timestamp cannot be cast to blank.lang.string....please tell me how to solve this error,.thank you so much."\n'], shape=(1,), dtype=string) 

label: 
 tf.Tensor([1], shape=(1,), dtype=int32)

Consuming generator from Sorted multiple large dataset

In [281]:
dataset = tf.keras.preprocessing.image_dataset_from_directory(
    './Dataset/mnist_png/training',
    batch_size=1,
    image_size = (28, 28),
    color_mode="grayscale",
    label_mode='categorical',
    subset='training',
    validation_split=0.2,
    seed=42)

for features,label in dataset.take(1):
    print('Feature shape: \n',features.shape, '\n')
    print('label: \n',label)
Found 60000 files belonging to 10 classes.
Using 48000 files for training.
Feature shape: 
 (1, 28, 28, 1) 

label: 
 tf.Tensor([[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]], shape=(1, 10), dtype=float32)

Consuming generator with data augmentation from Sorted multiple large dataset

In [287]:
image_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1.0/255).flow_from_directory(
    "./Dataset/mnist_png/training",
    target_size=(28, 28),
    batch_size=5,
    color_mode="grayscale",
    class_mode='categorical')
# model.fit(image_generator)

# for demo purpose
def generator():
    for features,label in image_generator:
        yield (features,label)
dataset = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))
for features,label in dataset.take(1):
    print('Feature shape: \n',features.shape, '\n')
    print('label: \n',label)
Found 60000 images belonging to 10 classes.
Feature shape: 
 (5, 28, 28, 1) 

label: 
 tf.Tensor(
[[1 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0]
 [0 1 0 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0 0 0]], shape=(5, 10), dtype=int32)

Consuming customized generator from Sorted multiple large dataset

reference: https://bbs.cvmart.net/topics/1545

In [272]:
import os, pathlib, cv2, random
from tensorflow.python.keras.utils.data_utils import Sequence
from sklearn.preprocessing import OneHotEncoder

class ImageDataFeeder(Sequence):

    def __init__(self, filepath, batch_size=8, imgshape=(28, 28),
                 n_channels=3, n_classes=13, shuffle=True, ):
        # initiation method
        self.filepath=filepath
        self.pathlist= [str(pathlib.Path(path)) for path in pathlib.Path(self.filepath).rglob('*.png')]
        
        self.batch_size = batch_size
        self.imgshape = imgshape
        self.n_channels = n_channels
        self.label_dict = self._generate_label_dict(self.filepath)
        
        self.shuffle = shuffle
        self.on_epoch_end()

    def __getitem__(self, index):
        # generate batch index
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        # generate list of batch index
        batch_pathlist = [self.pathlist[k] for k in indexes]
        # generate data
        images = self._generate_images(batch_pathlist)
        labels = self._generate_labels(batch_pathlist)
        return images, labels
    
    def __len__(self):
        # return the number of batch
        return int(np.floor(len(self.pathlist) / self.batch_size))
    
    def _generate_label_dict(self, filepath):
        # get index of label 
        label_names = sorted(set(os.listdir(filepath)))
        label_to_onehot = OneHotEncoder(sparse = False)
        label_to_onehot.fit(np.array(label_names).reshape(-1, 1))
        label_dict = dict(zip(label_names, label_to_onehot.fit_transform(np.array(label_names).reshape(-1, 1))))
        return label_dict

    def _load_image(self, image_path):
        def gasuss_noise(image, mean=0, var=0.01):
            noise = np.random.normal(mean, var ** 0.5, image.shape)
            out = image + noise
            if out.min() < 0:
                low_clip = -1.
            else:
                low_clip = 0.
            out = np.clip(out, low_clip, 1.0)
            return out
        img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)/255 # grey 1 channel
        img = cv2.flip(img, flipCode= random.randint(-1,1)) # flip up or down
#         img = rotate(img, angle=random.randint(-5,5), mode='reflect') # rotate
        img = cv2.warpAffine(img,
                             M = np.float32([[1, 0, random.randint(-28,28)],
                                             [0, 1, random.randint(-28,28)]]),
                             dsize = img.shape)
        img = gasuss_noise(img, var = random.randint(1,10)/1000)
        if self.imgshape != img.shape:
            img = cv2.resize(img, self.imgshape)
            
        return np.expand_dims(img, -1)
    
    def _generate_images(self, batch_pathlist):
        # generate images for a batach
        images = np.empty((self.batch_size, *self.imgshape, self.n_channels))
        for i, path in enumerate(batch_pathlist):
            images[i,] = self._load_image(path)
        return images

    def _generate_labels(self, batch_pathlist):
        # generate labels for a batch
        labels = np.empty((self.batch_size, len(self.label_dict) ), dtype=int)
        for i, path in enumerate(batch_pathlist):
            # Store sample
            labels[i,] = label_dict.get(path.split('/')[-2])
        return labels

    def on_epoch_end(self):
        # update index at the end of each epoch
        self.indexes = np.arange(len(self.pathlist))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
            
# Parameters
params = {'batch_size': 5,
        'n_channels': 1,
        'n_classes': 10,
        'shuffle': True,
        'imgshape': (28,28)}
image_generator = ImageDataFeeder('./Dataset/mnist_png/training', **params)
# model.fit(image_generator)

# for demo purpose
def generator():
    for features,label in image_generator:
        yield (features,label)
dataset = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))
for features,label in dataset.take(1):
    print('Feature shape: \n',features.shape, '\n')
    print('label: \n',label)
Feature shape: 
 (5, 28, 28, 1) 

label: 
 tf.Tensor(
[[0 0 0 0 0 0 1 0 0 0]
 [0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 1]
 [0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 0 0]], shape=(5, 10), dtype=int32)
In [ ]: