3

I am following TFF tutorials to build my FL model My data is contained in different CSV files which are considered as different clients. Following this tutorial, and build the Keras model function as following

@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
   return tf.data.experimental.CsvDataset(dataset_path, 
                                          record_defaults=record_defaults,
                                          header=True)
   
@tf.function
def add_parsing(dataset):
  def parse_dataset(*x):
    return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
  return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
source = tff.simulation.datasets.FilePerUserClientData(
  dataset_paths, create_tf_dataset_for_client_fn) 

client_ids = sorted(source.client_ids)

# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids] 
source = source.preprocess(add_parsing)

train, test = source.train_test_client_split(source, 1)

train_client_ids = train.client_ids

train_data = train.create_tf_dataset_for_client(train_client_ids[0])

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(32,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])
def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=train_data.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Then I followed instructions and run other @tff.tf_computation functions as the tutorial, like def server_init(), def initialize_fn(), def client_update() and def server_update(). But when I run the def client_update_fn() I got this error

ValueError: in user code:

   File "<ipython-input-14-cada45ffae0f>", line 12, in client_update  *
       for batch in dataset:
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 455, in forward_pass  *
       return self._forward_pass(batch_input, training=training)
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 408, in _forward_pass  *
       predictions = self.predict_on_batch(inputs, training)
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 398, in predict_on_batch  *
       return self._keras_model(x, training=training)
   File "/usr/local/lib/python3.7/dist-packages/keras/engine/base_layer_v1.py", line 740, in __call__  **
       self.name)
   File "/usr/local/lib/python3.7/dist-packages/keras/engine/input_spec.py", line 200, in assert_input_compatibility
       raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'

ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors. Inputs received: [<tf.Tensor 'x:0' shape=() dtype=int32>, <tf.Tensor 'x_1:0' shape=() dtype=int32>, <tf.Tensor 'x_2:0' shape=() dtype=int32>, <tf.Tensor 'x_3:0' shape=() dtype=float32>, <tf.Tensor 'x_4:0' shape=() dtype=float32>, <tf.Tensor 'x_5:0' shape=() dtype=float32>, <tf.Tensor 'x_6:0' shape=() dtype=float32>, <tf.Tensor 'x_7:0' shape=() dtype=float32>, <tf.Tensor 'x_8:0' shape=() dtype=float32>, <tf.Tensor 'x_9:0' shape=() dtype=int32>]

Notes:

  • each CSV file has 10 column as features (input) and one column as label (output).
  • I added the shape=(32,) arbitrary, I don't really know what are the shape of the data is in each column?

So, the question is, how to feed the data to the keras model and overcome this error

Thanks in advance

1 Answer 1

1
+50

A couple problems: Your data has ten separate features, which means you actually need 10 separate inputs for your model. However, you can also stack the features into a tensor and then use a single input with the shape (10,). Here is a working example, but please note that it uses dummy data and therefore may not make much sense in reality.

Create dummy data:

import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd
from collections import OrderedDict
import nest_asyncio
nest_asyncio.apply()

# Dummy data
samples = 5
data = [[tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
        tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
        tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
        tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.normal((samples,)).numpy().tolist(),
        tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
        tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist()]]
df = pd.DataFrame(data)
df = df.explode(list(df.columns))
df.to_csv('client1.csv', index= False)
df.to_csv('client2.csv', index= False)

Load and process dataset:

import tensorflow as tf

record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]

@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
   return tf.data.experimental.CsvDataset(dataset_path, 
                                          record_defaults=record_defaults,
                                          header=True)
@tf.function
def add_parsing(dataset):
  def parse_dataset(*x):
    return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
  return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)

dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv'}

source = tff.simulation.datasets.FilePerUserClientData(
  dataset_paths, create_tf_dataset_for_client_fn) 

client_ids = sorted(source.client_ids)

# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids] 
source = source.preprocess(add_parsing)

train, test = source.train_test_client_split(source, 1)

train_client_ids = train.client_ids

def reshape_data(d):
  d['x'] = tf.stack([tf.cast(x, dtype=tf.float32) for x in d['x']])
  return d

train_data = [train.create_tf_dataset_for_client(c).map(reshape_data).batch(1) for c in train_client_ids]

Create and run model:

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(10,)),
      tf.keras.layers.Dense(75, kernel_initializer=initializer),
      tf.keras.layers.Dense(50, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])
def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  client_weights = model.trainable_variables
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  for batch in dataset:
    with tf.GradientTape() as tape:
      outputs = model.forward_pass(batch)

    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)
str(get_average_temperature.type_signature)
get_average_temperature([68.5, 70.3, 69.8])

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  server_weights_at_client = tff.federated_broadcast(server_weights)
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))
  mean_client_weights = tff.federated_mean(client_weights)

  server_weights = tff.federated_map(server_update_fn, mean_client_weights)
  return server_weights

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

server_state = federated_algorithm.initialize()
for round in range(15):
  server_state = federated_algorithm.next(server_state, train_data)

Regarding this line in the model: tf.keras.layers.Dense(50, kernel_initializer=initializer), I am using 50 output nodes, since I created dummy labels that can vary between 0 and 49. This is necessary when using the SparseCategoricalCrossentropy loss function.

Sign up to request clarification or add additional context in comments.

9 Comments

Thanks for your answer, I got an error after running this loop server_state = federated_algorithm.next(server_state, [train_data]) the error message is Received a label value of 3 which is outside the valid range of [0, 3). Label values: 3 [[{{node sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/SparseSoftmaxCrossEntropyWithLogits}}]]. Notice that, I have replaced (50) with (3) since I have 3 labels in my data.
@EdenJ did you make sure all your labels are between 0 and 2? You always start counting from zero
It is actually between 1 and 3
Well that is the problem then :)..Here is an explanation: stackoverflow.com/questions/56948192/…... Change your labels and it will work
Ok that makes sense, I will change it and check
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.