View this notebook on GitHub

Adding new model to the existing DataLabeler pipeline

Consider the case when we would like to explore different character-level neural network models and evaluate their performance on different datasets. The existing DataLabeler in the DataProfiler library already contains a preprocessor, a postprocessor, and a character-level CNN (Convolutional Neural Network) model that are combined to work on such data. All we need is to build additional model classes that inherit the main functionalities from the CNN model and also adapt the model construction to the desired architectures. In this example, we define such a new model to be used with the Data Labeler component of the Data Profiler. In particular, a character-level LSTM (Long Short-Term Memory) model is implemented, then integrated into the DataLabeler pipeline to be trained with a tabular dataset. The process includes the following steps:

- Build a new character-level LSTM model that inherits the CNN model
- Load the DataLabeler from the DataProfiler
- Swap the existing CNN model with the new LSTM model
- Train the data labeler pipeline on a given dataset

First, let’s import the libraries needed for this example.

[ ]:
import os
import sys
import json
import pandas as pd
sys.path.insert(0, '..')
import dataprofiler as dp

Dataset

In this example, we use a structured dataset, the aws honeypot dataset, given in the test folder of the library. This dataset is first read by the Data Reader class of the Data Profiler, then split into training and test data to be used in the next sections.

[ ]:
# use data reader to read input data
data = dp.Data("../dataprofiler/tests/data/csv/aws_honeypot_marx_geo.csv")
df_data = data.data

# split data to training and test set
split_ratio = 0.2
df_data = df_data.sample(frac=1).reset_index(drop=True)
data_train = df_data[:int((1 - split_ratio) * len(df_data))]
data_test = df_data[int((1 - split_ratio) * len(df_data)):]

df_data.head()

Implement a new character-level LSTM model

This new model is inherited from CharacterLevelCnnModel class, with some modifications on the following functions

__init__: to add new parameters for the LSTM model. The new parameters, size_lstm, rec_dropout, activation, recurrent_activation, specify number of LSTM layers, activation function, and recurrent dropout ratio.

_validate_parameters: to add additional checks on the new parameters for the LSTM model

_construct_model: to construct the new LSTM model with the desired architecture

[ ]:
import tensorflow as tf
import numpy as np
from dataprofiler.labelers.character_level_cnn_model import CharacterLevelCnnModel, F1Score, \
                                                            create_glove_char, build_embd_dictionary
from dataprofiler.labelers.base_model import BaseModel

# CharacterLevelLstmModel derives from CharacterLevelCnnModel
#########################################################
#########################################################
class CharacterLevelLstmModel(CharacterLevelCnnModel):

    # boolean if the label mapping requires the mapping for index 0 reserved
    requires_zero_mapping = True

    def __init__(self, label_mapping=None, parameters=None):
        """
        LSTM Model Initializer
        """

        # parameter initialization
        if not parameters:
            parameters = {}
        parameters.setdefault('max_length', 3400)
        parameters.setdefault('max_char_encoding_id', 127)
        parameters.setdefault('dim_embed', 64)
        parameters.setdefault('size_fc', [32, 32])
        parameters.setdefault('dropout', 0.1)
        # new parameters for LSTM model
        #########################################################
        #########################################################
        parameters.setdefault('size_lstm', [64])
        parameters.setdefault('rec_dropout', 0.1)
        parameters.setdefault('activation', "tanh")
        parameters.setdefault('recurrent_activation', "sigmoid")
        #########################################################
        #########################################################
        parameters.setdefault('default_label', "UNKNOWN")
        parameters['pad_label'] = 'PAD'
        self._epoch_id = 0

        # reconstruct flags for model
        self._model_num_labels = 0
        self._model_default_ind = -1

        BaseModel.__init__(self, label_mapping, parameters)

    def _validate_parameters(self, parameters):
        """
        Validate the parameters sent in. Raise error if invalid parameters are
        present.
        """
        errors = []
        list_of_necessary_params = ['max_length', 'max_char_encoding_id',
                                    'dim_embed', 'size_fc', 'dropout',
                                    'size_lstm', 'rec_dropout', 'activation',
                                    'recurrent_activation', 'default_label',
                                    'pad_label']
        # Make sure the necessary parameters are present and valid.
        for param in parameters:
            if param in ['max_length', 'max_char_encoding_id', 'dim_embed',
                         'size_conv']:
                if not isinstance(parameters[param], (int, float)) \
                        or parameters[param] < 0:
                    errors.append(param + " must be a valid integer or float "
                                          "greater than 0.")
            elif param in ['dropout', 'rec_dropout']: # additional check for rec_dropout
                if not isinstance(parameters[param], (int, float)) \
                        or parameters[param] < 0 or parameters[param] > 1:
                    errors.append(param + " must be a valid integer or float "
                                          "from 0 to 1.")
            elif param == 'size_fc' or param == 'size_lstm': # additional check for size_lstm
                if not isinstance(parameters[param], list) \
                        or len(parameters[param]) == 0:
                    errors.append(param + " must be a non-empty list of "
                                          "integers.")
                else:
                    for item in parameters[param]:
                        if not isinstance(item, int):
                            errors.append(param + " must be a non-empty "
                                                  "list of integers.")
                            break
            elif param in ['default_label', 'activation', 'recurrent_activation']: # additional check for activation and recurrent_activation
                if not isinstance(parameters[param], str):
                    error = str(param) + " must be a string."
                    errors.append(error)

        # Error if there are extra parameters thrown in
        for param in parameters:
            if param not in list_of_necessary_params:
                errors.append(param + " is not an accepted parameter.")
        if errors:
            raise ValueError('\n'.join(errors))

    def _construct_model(self):
        """
        Model constructor for the data labeler. This also serves as a weight
        reset.

        :return: None
        """
        num_labels = self.num_labels
        default_ind = self.label_mapping[self._parameters['default_label']]

        # Reset model
        tf.keras.backend.clear_session()

        # generate glove embedding
        create_glove_char(self._parameters['dim_embed'])

        # generate model
        self._model = tf.keras.models.Sequential()

        # default parameters
        max_length = self._parameters['max_length']
        max_char_encoding_id = self._parameters['max_char_encoding_id']

        # Encoding layer
        def encoding_function(input_str):
            char_in_vector = CharacterLevelLstmModel._char_encoding_layer(
                input_str, max_char_encoding_id, max_length)
            return char_in_vector

        self._model.add(tf.keras.layers.Input(shape=(None,), dtype=tf.string))

        self._model.add(
            tf.keras.layers.Lambda(encoding_function,
                                   output_shape=tuple([max_length])))

        # Create a pre-trained weight matrix
        # character encoding indices range from 0 to max_char_encoding_id,
        # we add one extra index for out-of-vocabulary character
        embed_file = os.path.join(
            "../dataprofiler/labelers", "embeddings/glove-reduced-{}D.txt".format(
                self._parameters['dim_embed']))
        embedding_matrix = np.zeros((max_char_encoding_id + 2,
                                     self._parameters['dim_embed']))
        embedding_dict = build_embd_dictionary(embed_file)

        input_shape = tuple([max_length])
        # Fill in the weight matrix: let pad and space be 0s
        for ascii_num in range(max_char_encoding_id):
            if chr(ascii_num) in embedding_dict:
                embedding_matrix[ascii_num + 1] = embedding_dict[chr(ascii_num)]

        self._model.add(tf.keras.layers.Embedding(
            max_char_encoding_id + 2,
            self._parameters['dim_embed'],
            weights=[embedding_matrix],
            input_length=input_shape[0],
            trainable=True))

        # Add the lstm layers
        #########################################################
        #########################################################
        for size in self._parameters['size_lstm']:
            self._model.add(
                tf.keras.layers.LSTM(units=size,
                                     recurrent_dropout=self._parameters['rec_dropout'],
                                     activation=self._parameters['activation'],
                                     recurrent_activation=self._parameters['recurrent_activation'],
                                     return_sequences=True))
            if self._parameters['dropout']:
                self._model.add(tf.keras.layers.Dropout(self._parameters['dropout']))
        #########################################################
        #########################################################

        # Add the fully connected layers
        for size in self._parameters['size_fc']:
            self._model.add(
                tf.keras.layers.Dense(units=size, activation='relu'))
            if self._parameters['dropout']:
                self._model.add(
                    tf.keras.layers.Dropout(self._parameters['dropout']))

        # Add the final Softmax layer
        self._model.add(
            tf.keras.layers.Dense(num_labels, activation='softmax'))

        # Output the model into a .pb file for TensorFlow
        argmax_layer = tf.keras.backend.argmax(self._model.output)

        # Create confidence layers
        final_predicted_layer = CharacterLevelLstmModel._argmax_threshold_layer(
            num_labels, threshold=0.0, default_ind=default_ind)

        argmax_outputs = self._model.outputs + \
                         [argmax_layer,
                          final_predicted_layer(argmax_layer, self._model.output)]
        self._model = tf.keras.Model(self._model.inputs, argmax_outputs)

        # Compile the model
        softmax_output_layer_name = self._model.outputs[0].name.split('/')[0]
        losses = {softmax_output_layer_name: "categorical_crossentropy"}

        # use f1 score metric
        f1_score_training = F1Score(num_classes=num_labels, average='micro')
        metrics = {softmax_output_layer_name: ['acc', f1_score_training]}

        self._model.compile(loss=losses,
                            optimizer="adam",
                            metrics=metrics)

        self._epoch_id = 0
        self._model_num_labels = num_labels
        self._model_default_ind = default_ind

Integrate the new LSTM model to the DataLabeler

Once the LSTM model is built, it replaces the existing model in the DataLabeler pipeline, which is then trained on the given dataset. Note that, as the DataLabeler is trained on the above tabular dataset, its label mapping is updated by the list of column names in that dataset while training.

[ ]:
# get labels from the given dataset
value_label_df = data_train.reset_index(drop=True).melt()
value_label_df.columns = [1, 0]  # labels=1, values=0 in that order
value_label_df = value_label_df.astype(str)
labels = value_label_df[1].unique().tolist()

# create a new LSTM model
# set default label (one of the column names) to the model
model = CharacterLevelLstmModel(label_mapping=labels, parameters={'default_label': 'comment'})

# add the new LSTM model to the data labeler
data_labeler = dp.DataLabeler(labeler_type='structured', trainable=True)
data_labeler.set_model(model)

# set default label (one of the column names) to the preprocessor and postprocessor
processor_params = {'default_label': 'comment'}
data_labeler._preprocessor.set_params(**processor_params)
data_labeler._postprocessor.set_params(**processor_params)

# train the data labeler
save_dirpath="data_labeler_saved"
if not os.path.exists(save_dirpath):
    os.makedirs(save_dirpath)

epochs=2
data_labeler.fit(
    x=value_label_df[0], y=value_label_df[1], labels=labels, epochs=epochs)
if save_dirpath:
    data_labeler.save_to_disk(save_dirpath)

The trained Data Labeler is then used by the Data Profiler to provide the prediction on the new dataset. In this example, all options except data labeler are disabled for the sake of presenting data labeler functionality. The results are given in the columnar format where true column types are given in the first column, and the predicted column labels are given in the second column.

[ ]:
# predict with the data labeler object
profile_options = dp.ProfilerOptions()
profile_options.set({"structured_options.text.is_enabled": False,
                     "int.is_enabled": False,
                     "float.is_enabled": False,
                     "order.is_enabled": False,
                     "category.is_enabled": False,
                     "datetime.is_enabled": False,})
profile_options.set({'structured_options.data_labeler.data_labeler_object': data_labeler})
profile = dp.Profiler(data_test, options=profile_options)

# get the prediction from the data profiler
def get_structured_results(results):
    columns = []
    predictions = []
    for col_report in results['data_stats']:
        columns.append(col_report['column_name'])
        predictions.append(col_report['data_label'])

    df_results = pd.DataFrame({'Column': columns, 'Prediction': predictions})
    return df_results

results = profile.report()
print(get_structured_results(results))

In summary, users can define their own model, plug it in the DataLabeler pipeline, and train the labeler with the new dataset. Above, we show one example of adding the LSTM model to the pipeline. Interested users can implement other neural network models as desired with the same process.