0

I am using TensorFlow/Keras to create a deep learning model. The network is built as follows:

inps = [] 
features = [] 

for i in range(number_windows):

    inp = Input(shape=(window_length,), name=f"input_{i}")
    inps.append(inp)

    feat = Dense(25)(inp)
    feat = BatchNormalization()(feat)
    feat = LeakyReLU()(feat)
    features.append(feat)

comb = concatenate(features)
comb = Dropout(0.50)(comb)

top = Dense(512)(comb)
top = BatchNormalization()(top)
top = LeakyReLU()(top)
top = Dropout(0.40)(top)

top = Dense(256)(top)

emb = EmbeddingLayer()(top)
    
top = BatchNormalization()(top)
top = LeakyReLU()(top)
top = Dropout(0.25)(top)

classification = Dense(n_classes, activation='softmax', name='classification')(top)

mdl = Model(inputs=inps, outputs=[emb, classification])

The EmbeddingLayer is a custom layer that effectively returns an L2 normalization of the input. I have a data generating function:

def data_loading_generator(
    data_matrix: np.typing.NDArray,
    data_labels: np.typing.NDArray,
    window_length,
    dw
):
    num_rows = data_matrix.shape[0]
    y_onehot = np.stack(
        [np.flip(data_labels), data_labels],
        axis=1
    )
    data_segments = segment_data_batch(
        data_mat=data_matrix,
        w=window_length,
        dw=dw
    )
    for row_number in range(0, num_rows):
        yield (
            {f"input_{ii}": x[row_number, :] for ii, x in enumerate(data_segments)},
            (
                {
                    "embedding_layer": data_labels[row_number],
                    "classification": y_onehot[row_number, :]
                }
            )
        )

The function segment_data_batch takes in a matrix and outputs a list of overlapping segments from each row of the matrix, length window_length, and overlap window_length - dw. I believe I can optimize this a little by removing the segment_data_batch function and simply segmenting each row of the matrix as they are generated:

def data_loading_generator(
    data_matrix: np.typing.NDArray,
    data_labels: np.typing.NDArray,
    window_length,
    dw
):
    num_rows = data_matrix.shape[0]
    for row_number in range(0, num_rows):
        data_segments = segment_data(
            spectra_matrix[row_number, :], w=window_length, dw=dw
        )
        yield (
            {f"input_{ii}": data_segments[ii, :] for ii in range(data_segments.shape[0])},
            (
                {
                     "embedding_layer": data_labels[row_number],
                     "classification": tf.one_hot(
                         data_labels[row_number], depth=2, dtype=tf.uint16
                     )
                }
            )
        )

The new function segment_data takes a single row in the data_matrix and returns a numpy array number_windows x window_length. However, I'm wondering if I can make this more efficient using native TensorFlow functions.

1 Answer 1

1

So if you want to keep your current architecture, you should let tf.data create the windows on the fly (per row or per batch) instead of pre-materializing them in Python lists as I said in question-79754606 (comment),

The easy way would be to build the dataset with tf.signal.frame, take this as a draft:

# X_train: (N, M) float32
# Y_train: (N,) int32  with values in [0, n_classes-1]
X_train = X_train.astype('float32')
Y_train = Y_train.astype('int32')

w_len = 50       # window_length
dw    = 5        # step
M     = X_train.shape[1]
number_windows = 1 + (M - w_len) // dw

name_list = [f"input_{i}" for i in range(number_windows)]

def row_to_inputs_dict(row, label):
    # row: (M,)
    frames = tf.signal.frame(row, frame_length=w_len, frame_step=dw, pad_end=False)  # (W, w_len)
    frames = frames[:number_windows, :]                 # force fixed number if pad_end=False
    frames = tf.ensure_shape(frames, [number_windows, w_len])

    # Split into your multi-input dict
    pieces = tf.unstack(frames, axis=0)                 # list of (w_len,)
    inputs = {name_list[i]: pieces[i] for i in range(number_windows)}

    # Outputs dict keyed by layer names
    outputs = {
        "embedding_layer": tf.cast(label, tf.int32),    # for your embedding loss/head
        "classification": tf.one_hot(label, depth=n_classes, dtype=tf.float32),
    }
    return inputs, outputs

def build_dataset(X, y, batch_size=32, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices((X, y))
    if shuffle:
        ds = ds.shuffle(min(len(X), 10_000))
    ds = ds.map(row_to_inputs_dict, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

Then to build it:

train_ds = build_dataset(X_train, Y_train, batch_size=32)

and use it:

mdl.fit(train_ds, epochs=..., validation_data=...)

P.S: I'll edit this answer later with an option 2 that may be better.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.