From b73d1c242bad3e40561cd4c2ab60176ccb08018e Mon Sep 17 00:00:00 2001 From: Omid Jafari Date: Mon, 9 Jun 2025 11:31:23 -0500 Subject: [PATCH] Add tutorial for federated fine-tuning of language models with HuggingFace Signed-off-by: Omid Jafari --- .../HuggingFace/Portland/Portland_config.yaml | 6 + .../HuggingFace/Portland/data/.keep | 0 .../Portland/private_attributes.py | 6 + .../HuggingFace/Portland/start_envoy.sh | 6 + .../FederatedRuntime/HuggingFace/README.md | 61 ++ .../HuggingFace/Seattle/Seattle_config.yaml | 6 + .../HuggingFace/Seattle/data/.keep | 0 .../HuggingFace/Seattle/private_attributes.py | 6 + .../HuggingFace/Seattle/start_envoy.sh | 6 + .../HuggingFace/director/director_config.yaml | 4 + .../HuggingFace/director/start_director.sh | 4 + .../workspace/HF_FederatedRuntime.ipynb | 789 ++++++++++++++++++ 12 files changed, 894 insertions(+) create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/Portland_config.yaml create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/data/.keep create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/private_attributes.py create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/start_envoy.sh create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/README.md create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/Seattle_config.yaml create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/data/.keep create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/private_attributes.py create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/start_envoy.sh create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/director_config.yaml create mode 100755 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/start_director.sh create mode 100644 openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/workspace/HF_FederatedRuntime.ipynb diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/Portland_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/Portland_config.yaml new file mode 100755 index 0000000000..715db086d1 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/Portland_config.yaml @@ -0,0 +1,6 @@ +settings: + director_host: localhost + director_port: 50050 + +Portland: + private_attributes: private_attributes.portland_attrs diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/data/.keep b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/data/.keep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/private_attributes.py new file mode 100644 index 0000000000..7578ce162a --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/private_attributes.py @@ -0,0 +1,6 @@ +from datasets import load_from_disk + +portland_attrs = { + "train_dataset": load_from_disk("../data/imdb_train_portland"), + "test_dataset": load_from_disk("../data/imdb_test_portland"), +} diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/start_envoy.sh new file mode 100755 index 0000000000..5c2d296cda --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Portland/start_envoy.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e +ENVOY_NAME=$1 +ENVOY_CONF=$2 + +fx envoy start -n "$ENVOY_NAME" --disable-tls -c "$ENVOY_CONF" diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/README.md b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/README.md new file mode 100644 index 0000000000..2896b4e874 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/README.md @@ -0,0 +1,61 @@ +# HuggingFace + +## **How to run this tutorial (without TLS and locally as a simulation):** +
+ +### 0. If you haven't done so already, create a virtual environment, install OpenFL, and upgrade pip: + - For help with this step, visit the "Install the Package" section of the [OpenFL installation instructions](https://openfl.readthedocs.io/en/latest/installation.html). + +
+ +### 1. Split terminal into 4 (1 terminal for the director, 2 for the envoys, and 1 for the experiment) + +
+ +### 2. Do the following in each terminal: + - Activate the virtual environment from step 0: + + ```sh + source venv/bin/activate + ``` + - If you are in a network environment with a proxy, ensure proxy environment variables are set in each of your terminals. + - Navigate to the tutorial: + + ```sh + cd openfl/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/ + ``` + +
+ +### 3. In the first terminal, activate experimental features and run the director: + +```sh +fx experimental activate +cd director +./start_director.sh +``` + +
+ +### 4. In the second, and third terminals, run the envoys: + +#### 4.1 Second terminal +```sh +cd Portland +./start_envoy.sh Portland Portland_config.yaml +``` + +#### 4.2 Third terminal +```sh +cd Seattle +./start_envoy.sh Seattle Seattle_config.yaml +``` + +
+ +### 5. Now that your director and envoy terminals are set up, run the Jupyter Notebook in your experiment terminal: + +```sh +cd workspace +jupyter execute HF_FederatedRuntime.ipynb +``` diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/Seattle_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/Seattle_config.yaml new file mode 100755 index 0000000000..a9f87c0a26 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/Seattle_config.yaml @@ -0,0 +1,6 @@ +settings: + director_host: localhost + director_port: 50050 + +Seattle: + private_attributes: private_attributes.seattle_attrs diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/data/.keep b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/data/.keep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/private_attributes.py new file mode 100644 index 0000000000..9984df6d22 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/private_attributes.py @@ -0,0 +1,6 @@ +from datasets import load_from_disk + +seattle_attrs = { + "train_dataset": load_from_disk("../data/imdb_train_seattle"), + "test_dataset": load_from_disk("../data/imdb_test_seattle"), +} diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/start_envoy.sh new file mode 100755 index 0000000000..5c2d296cda --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/Seattle/start_envoy.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e +ENVOY_NAME=$1 +ENVOY_CONF=$2 + +fx envoy start -n "$ENVOY_NAME" --disable-tls -c "$ENVOY_CONF" diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/director_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/director_config.yaml new file mode 100755 index 0000000000..021cfc59c9 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/director_config.yaml @@ -0,0 +1,4 @@ +settings: + listen_host: localhost + listen_port: 50050 + envoy_health_check_period: 5 # in seconds \ No newline at end of file diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/start_director.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/start_director.sh new file mode 100755 index 0000000000..5806a6cc0a --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/director/start_director.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +fx director start --disable-tls -c director_config.yaml \ No newline at end of file diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/workspace/HF_FederatedRuntime.ipynb b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/workspace/HF_FederatedRuntime.ipynb new file mode 100644 index 0000000000..cb057a02e6 --- /dev/null +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/HuggingFace/workspace/HF_FederatedRuntime.ipynb @@ -0,0 +1,789 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "dc13070c", + "metadata": {}, + "source": [ + "# Federated Runtime: HuggingFace Fine-Tuning" + ] + }, + { + "cell_type": "markdown", + "id": "cbe52a4e", + "metadata": {}, + "source": [ + "In this notebook, you will learn how to fine-tune a text-classification model on the IMDb dataset using Federated Learning.\n", + "\n", + "We will begin by simulating the entire workflow locally (using `LocalRuntime`), then deploy that same workflow into a Federated infrastructure (using `FederatedRuntime`).\n" + ] + }, + { + "cell_type": "markdown", + "id": "b0f023eb", + "metadata": {}, + "source": [ + "**Note:**\n", + "\n", + "Cells marked with the `#| export` directive will be automatically exported to the FL workspace. This workspace is then shared with all Federated Learning clients for execution.\n", + "\n", + "The export directive is only required when using the `FederatedRuntime`." + ] + }, + { + "cell_type": "markdown", + "id": "b3b0701e", + "metadata": {}, + "source": [ + "# Getting Started" + ] + }, + { + "cell_type": "markdown", + "id": "b62ffd86", + "metadata": {}, + "source": [ + "In the following cell `#| default_exp` experiment directive sets the name of the python module as `experiment`. This name can be customized according to the user’s requirements and preferences." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d79eacbd", + "metadata": {}, + "outputs": [], + "source": [ + "# | default_exp experiment" + ] + }, + { + "cell_type": "markdown", + "id": "d109332c", + "metadata": {}, + "source": [ + "### Installing requirements\n", + "\n", + "We begin with installing the required packages and dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f7475cba", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "%pip install git+https://github.com/securefederatedai/openfl.git\n", + "%pip install -r ../../../workflow_interface_requirements.txt\n", + "%pip install -U datasets==3.0.0\n", + "%pip install -U transformers==4.44.2\n", + "%pip install -U evaluate==0.4.3\n", + "%pip install -U ipywidgets\n", + "%pip install -U torch==2.4.1\n", + "%pip install -U accelerate==0.34.2\n", + "%pip install -U termcolor" + ] + }, + { + "cell_type": "markdown", + "id": "df919206", + "metadata": {}, + "source": [ + "### Defining global variables and functions\n", + "\n", + "Next, we define hyperparameters and helper functions that are required throughout this tutorial" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9bd8ac2d", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "import evaluate\n", + "import numpy as np\n", + "from transformers import AutoTokenizer\n", + "\n", + "# Hyperparameters\n", + "RANDOM_SEED = 12345\n", + "MODEL_NAME = \"prajjwal1/bert-tiny\"\n", + "NUM_LABELS = 2\n", + "MAX_MODEL_LEN = 512\n", + "LEARNING_RATE = 2e-5\n", + "WEIGHT_DECAY = 0.01\n", + "PER_DEVICE_BATCH = 512\n", + "AUTO_FIND_BATCH_SIZE = True\n", + "NUM_TRAIN_EPOCHS = 3\n", + "FL_ROUNDS = 2\n", + "\n", + "# Load model and tokenizer\n", + "tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)\n", + "\n", + "\n", + "# Tokenization function\n", + "def tokenize_function(examples):\n", + " return tokenizer(\n", + " examples[\"text\"],\n", + " padding=\"max_length\",\n", + " truncation=True,\n", + " max_length=MAX_MODEL_LEN,\n", + " )\n", + "\n", + "\n", + "# Accuracy metrics\n", + "def compute_metrics(eval_pred):\n", + " accuracy_metric = evaluate.load(\"accuracy\")\n", + " f1_metric = evaluate.load(\"f1\")\n", + "\n", + " preds = np.argmax(eval_pred.predictions, axis=1)\n", + "\n", + " acc = accuracy_metric.compute(\n", + " predictions=preds,\n", + " references=eval_pred.label_ids,\n", + " )[\"accuracy\"]\n", + " f1 = f1_metric.compute(\n", + " predictions=preds,\n", + " references=eval_pred.label_ids,\n", + " average=\"weighted\",\n", + " )[\"f1\"]\n", + "\n", + " return {\"accuracy\": acc, \"f1\": f1}" + ] + }, + { + "cell_type": "markdown", + "id": "4770fe7c", + "metadata": {}, + "source": [ + "### Defining Federated Averaging function\n", + "\n", + "Next, we define a helper function for averaging the weights of models" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "89cf4866", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "import torch\n", + "\n", + "\n", + "def fed_avg(agg_model, client_models, weights=None):\n", + " client_state_dicts = [m.state_dict() for m in client_models]\n", + " agg_state_dict = agg_model.state_dict()\n", + " device = next(agg_model.parameters()).device\n", + " dtype = next(agg_model.parameters()).dtype\n", + "\n", + " if weights is None:\n", + " num_models = len(client_models)\n", + " weights = torch.ones(num_models, dtype=dtype, device=device) / num_models\n", + " else:\n", + " weights = torch.tensor(weights, dtype=dtype, device=device)\n", + "\n", + " with torch.no_grad():\n", + " for key in agg_state_dict:\n", + " stacked_tensors = torch.stack(\n", + " [sd[key].to(device) for sd in client_state_dicts],\n", + " dim=0,\n", + " )\n", + "\n", + " w = weights.view(-1, *[1] * (stacked_tensors.dim() - 1))\n", + " avg_tensor = torch.sum(stacked_tensors * w, dim=0)\n", + "\n", + " agg_state_dict[key] = avg_tensor\n", + "\n", + " agg_model.load_state_dict(agg_state_dict)\n", + " return agg_model" + ] + }, + { + "cell_type": "markdown", + "id": "2a9d8a60", + "metadata": {}, + "source": [ + "### Defining the federated workflow\n", + "\n", + "Next, we define the federated workflow that contains validation and training processes\n", + "\n", + "- FLSpec – Defines the flow specification. User defined flows are subclasses of this.\n", + "- aggregator/collaborator - placement decorators that define where the task will be assigned" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "52c4a752", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "from termcolor import colored\n", + "from transformers import AutoModelForSequenceClassification, Trainer, TrainingArguments, set_seed\n", + "\n", + "from openfl.experimental.workflow.interface import FLSpec\n", + "from openfl.experimental.workflow.placement import aggregator, collaborator\n", + "\n", + "\n", + "class FederatedFlowHF(FLSpec):\n", + " \"\"\"\n", + " This Flow fine-tunes a text classification model from HuggingFace.\n", + " \"\"\"\n", + "\n", + " def __init__(self, model=None, **kwargs):\n", + " super().__init__(**kwargs)\n", + "\n", + " if model is not None:\n", + " self.model = model\n", + " else:\n", + " set_seed(RANDOM_SEED)\n", + " self.model = AutoModelForSequenceClassification.from_pretrained(\n", + " MODEL_NAME,\n", + " num_labels=NUM_LABELS,\n", + " )\n", + "\n", + " self.rounds = FL_ROUNDS\n", + " self.results = []\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " \"\"\"\n", + " This is the start of the Flow.\n", + " \"\"\"\n", + " tag = colored(\"[Aggregator]\", \"white\", \"on_magenta\")\n", + " print(tag, \"Initializing Workflow ...\")\n", + "\n", + " self.collaborators = self.runtime.collaborators\n", + " self.current_round = 0\n", + "\n", + " self.next(self.aggregated_model_validation, foreach=\"collaborators\")\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " \"\"\"\n", + " Perform validation of aggregated model on collaborators.\n", + " \"\"\"\n", + " tag = colored(f\"[Collab: {self.input}]\", \"white\", \"on_blue\")\n", + " print(tag, \"Performing Validation on aggregated model ...\")\n", + "\n", + " test_ds = self.test_dataset\n", + " tokenized_test = test_ds.map(tokenize_function, batched=True, remove_columns=[\"text\"])\n", + "\n", + " eval_args = TrainingArguments(\n", + " output_dir=\"trainer_output\",\n", + " per_device_eval_batch_size=PER_DEVICE_BATCH,\n", + " auto_find_batch_size=AUTO_FIND_BATCH_SIZE,\n", + " do_train=False,\n", + " do_eval=True,\n", + " logging_strategy=\"no\",\n", + " save_strategy=\"no\",\n", + " report_to=[],\n", + " )\n", + " trainer = Trainer(\n", + " model=self.model,\n", + " args=eval_args,\n", + " eval_dataset=tokenized_test,\n", + " compute_metrics=compute_metrics,\n", + " )\n", + "\n", + " eval_metrics = trainer.evaluate()\n", + " self.agg_validation_accuracy = eval_metrics[\"eval_accuracy\"]\n", + " self.agg_validation_f1 = eval_metrics[\"eval_f1\"]\n", + " print(\n", + " tag,\n", + " f\"Aggregated Model validation accuracy = {self.agg_validation_accuracy:.4f}\",\n", + " f\"F1 = {self.agg_validation_f1:.4f}\",\n", + " )\n", + "\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " \"\"\"\n", + " Train model on Local collaborator dataset.\n", + " \"\"\"\n", + " tag = colored(f\"[Collab: {self.input}]\", \"white\", \"on_blue\")\n", + " print(tag, \"Training Model on local dataset ...\")\n", + "\n", + " train_ds = self.train_dataset\n", + " tokenized_train = train_ds.map(tokenize_function, batched=True, remove_columns=[\"text\"])\n", + "\n", + " train_args = TrainingArguments(\n", + " output_dir=\"trainer_output\",\n", + " eval_strategy=\"no\",\n", + " learning_rate=LEARNING_RATE,\n", + " per_device_train_batch_size=PER_DEVICE_BATCH,\n", + " auto_find_batch_size=AUTO_FIND_BATCH_SIZE,\n", + " num_train_epochs=NUM_TRAIN_EPOCHS,\n", + " weight_decay=WEIGHT_DECAY,\n", + " logging_strategy=\"no\",\n", + " save_strategy=\"no\",\n", + " report_to=[],\n", + " )\n", + "\n", + " trainer = Trainer(\n", + " model=self.model,\n", + " args=train_args,\n", + " train_dataset=tokenized_train,\n", + " )\n", + "\n", + " train_output = trainer.train()\n", + " self.loss = train_output.training_loss\n", + " print(tag, f\"Local training loss = {self.loss:.4f}\")\n", + "\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " \"\"\"\n", + " Validate locally trained model.\n", + " \"\"\"\n", + " tag = colored(f\"[Collab: {self.input}]\", \"white\", \"on_blue\")\n", + " print(tag, \"Performing Validation on locally trained model ...\")\n", + "\n", + " test_ds = self.test_dataset\n", + " tokenized_test = test_ds.map(tokenize_function, batched=True, remove_columns=[\"text\"])\n", + "\n", + " eval_args = TrainingArguments(\n", + " output_dir=\"trainer_output\",\n", + " per_device_eval_batch_size=PER_DEVICE_BATCH,\n", + " auto_find_batch_size=AUTO_FIND_BATCH_SIZE,\n", + " do_train=False,\n", + " do_eval=True,\n", + " logging_strategy=\"no\",\n", + " save_strategy=\"no\",\n", + " report_to=[],\n", + " )\n", + " trainer = Trainer(\n", + " model=self.model,\n", + " args=eval_args,\n", + " eval_dataset=tokenized_test,\n", + " compute_metrics=compute_metrics,\n", + " )\n", + "\n", + " eval_metrics = trainer.evaluate()\n", + " self.local_validation_accuracy = eval_metrics[\"eval_accuracy\"]\n", + " self.local_validation_f1 = eval_metrics[\"eval_f1\"]\n", + " print(\n", + " tag,\n", + " f\"Local model validation accuracy = {self.local_validation_accuracy:.4f}\",\n", + " f\"F1 = {self.local_validation_f1:.4f}\",\n", + " )\n", + " self.agg_validation_accuracy = eval_metrics[\"eval_accuracy\"]\n", + "\n", + " self.next(self.join)\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " \"\"\"\n", + " Model aggregation step.\n", + " \"\"\"\n", + " tag = colored(\"[Aggregator]\", \"white\", \"on_magenta\")\n", + " print(tag, \"Joining models from collaborators ...\")\n", + "\n", + " # Average Training loss, aggregated and locally trained model accuracy\n", + " sum_loss = 0.0\n", + " sum_agg_acc = 0.0\n", + " sum_agg_f1 = 0.0\n", + " sum_loc_acc = 0.0\n", + " sum_loc_f1 = 0.0\n", + " n = len(inputs)\n", + "\n", + " for inp in inputs:\n", + " sum_loss += inp.loss\n", + " sum_agg_acc += inp.agg_validation_accuracy\n", + " sum_agg_f1 += inp.agg_validation_f1\n", + " sum_loc_acc += inp.local_validation_accuracy\n", + " sum_loc_f1 += inp.local_validation_f1\n", + "\n", + " self.average_loss = sum_loss / n\n", + " self.aggregated_model_accuracy = sum_agg_acc / n\n", + " self.aggregated_model_f1 = sum_agg_f1 / n\n", + " self.local_model_accuracy = sum_loc_acc / n\n", + " self.local_model_f1 = sum_loc_f1 / n\n", + "\n", + " print(tag, f\"Round {self.current_round}:\")\n", + " print(f\"\\tAvg. aggregated model validation accuracy = {self.aggregated_model_accuracy:.4f}\")\n", + " print(f\"\\tAvg. aggregated model validation f1 = {self.aggregated_model_f1:.4f}\")\n", + " print(f\"\\tAvg. training loss = {self.average_loss:.4f}\")\n", + " print(f\"\\tAvg. local model validation accuracy = {self.local_model_accuracy:.4f}\")\n", + " print(f\"\\tAvg. local model validation f1 = {self.local_model_f1:.4f}\")\n", + "\n", + " # Averaging weights\n", + " self.model = fed_avg(self.model, [inp.model for inp in inputs])\n", + "\n", + " self.results.append(\n", + " [\n", + " self.current_round,\n", + " self.aggregated_model_accuracy,\n", + " self.aggregated_model_f1,\n", + " self.average_loss,\n", + " self.local_model_accuracy,\n", + " self.local_model_f1,\n", + " ],\n", + " )\n", + "\n", + " self.current_round += 1\n", + "\n", + " if self.current_round < self.rounds:\n", + " self.next(self.aggregated_model_validation, foreach=\"collaborators\")\n", + "\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " \"\"\"\n", + " This is the last step in the Flow.\n", + " \"\"\"\n", + " tag = colored(\"[Aggregator]\", \"white\", \"on_magenta\")\n", + " print(tag, \"This is the end of the flow\")" + ] + }, + { + "cell_type": "markdown", + "id": "b0757812", + "metadata": {}, + "source": [ + "### Simulation: LocalRuntime" + ] + }, + { + "cell_type": "markdown", + "id": "3bccffd7", + "metadata": {}, + "source": [ + "We now import & define the `LocalRuntime`, participants (`Aggregator/Collaborator`), and initialize the private attributes for participants.\n", + "\n", + "- `Runtime` – Defines where the flow runs. `LocalRuntime` simulates the flow on local node.\n", + "- `Aggregator/Collaborator` - (Local) Participants in the simulation\n", + "\n", + "Since this cell is used for simulation, we don't use the export directive." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bffcc141", + "metadata": {}, + "outputs": [], + "source": [ + "from datasets import load_dataset\n", + "\n", + "from openfl.experimental.workflow.interface import Aggregator, Collaborator\n", + "from openfl.experimental.workflow.runtime import LocalRuntime\n", + "\n", + "# Setup Aggregator & initialize private attributes\n", + "agg = Aggregator()\n", + "agg.private_attributes = {}\n", + "\n", + "# Setup Collaborators & initialize shards of MNIST dataset as private attributes\n", + "n_collaborators = 2\n", + "collaborator_names = [\"Portland\", \"Seattle\"]\n", + "\n", + "# Load imdb dataset\n", + "imdb_dataset = load_dataset(\"imdb\")\n", + "\n", + "# Split dataset between collaborators\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "for idx, collab in enumerate(collaborators):\n", + " local_train = imdb_dataset[\"train\"].select(\n", + " list(\n", + " range(idx, len(imdb_dataset[\"train\"]), n_collaborators),\n", + " ),\n", + " )\n", + " local_test = imdb_dataset[\"test\"].select(\n", + " list(\n", + " range(idx, len(imdb_dataset[\"test\"]), n_collaborators),\n", + " ),\n", + " )\n", + "\n", + " collab.private_attributes = {\n", + " \"train_dataset\": local_train,\n", + " \"test_dataset\": local_test,\n", + " }\n", + "\n", + "local_runtime = LocalRuntime(\n", + " aggregator=agg,\n", + " collaborators=collaborators,\n", + " backend=\"single_process\",\n", + ")\n", + "print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + ] + }, + { + "cell_type": "markdown", + "id": "78819357", + "metadata": {}, + "source": [ + "### Start Simulation" + ] + }, + { + "cell_type": "markdown", + "id": "3a2675ba", + "metadata": {}, + "source": [ + "Now that we have our flow and runtime defined, let's run the simulation! " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5f10d5d", + "metadata": {}, + "outputs": [], + "source": [ + "model = None\n", + "flflow = FederatedFlowHF(model, checkpoint=True)\n", + "flflow.runtime = local_runtime\n", + "flflow.run()" + ] + }, + { + "cell_type": "markdown", + "id": "50300fed", + "metadata": {}, + "source": [ + "Let us check the simulation results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5d77540", + "metadata": {}, + "outputs": [], + "source": [ + "from tabulate import tabulate\n", + "\n", + "simulation_results = flflow.results\n", + "headers = [\n", + " \"Rounds\",\n", + " \"Agg Model Validation Accuracy\",\n", + " \"Agg Model Validation F1\",\n", + " \"Local Train loss\",\n", + " \"Local Model Validation Accuracy\",\n", + " \"Local Model Validation F1\",\n", + "]\n", + "\n", + "print(\"********** Simulation results **********\")\n", + "print(tabulate(simulation_results, headers=headers, tablefmt=\"outline\"))" + ] + }, + { + "cell_type": "markdown", + "id": "b5371b6d", + "metadata": {}, + "source": [ + "### Setup Federation: Director & Envoys" + ] + }, + { + "cell_type": "markdown", + "id": "f270e385", + "metadata": {}, + "source": [ + "Before we can deploy the experiment, let us create participants in Federation: Director and Envoys. As the Tutorial uses two collaborators we shall launch three participants:\n", + "1. Director: The central node in the Federation\n", + "2. Portland: The first envoy in the Federation\n", + "3. Seattle: The second envoy in the Federation \n", + "\n", + "The participants can be launched by following steps mentioned in [README]((https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/FederatedRuntime/101_MNIST/README.md))\n" + ] + }, + { + "cell_type": "markdown", + "id": "f9d556d0", + "metadata": {}, + "source": [ + "### Deploy: FederatedRuntime" + ] + }, + { + "cell_type": "markdown", + "id": "5ffd73b6", + "metadata": {}, + "source": [ + "We now import and instantiate `FederatedRuntime` to enable deployment of experiment on distributed infrastructure. Initializing the `FederatedRuntime` requires following inputs to be provided by the user:\n", + "\n", + "- `director_info` – director information including fqdn of the director node, port, and certificate information\n", + "- `collaborators` - names of the collaborators participating in experiment\n", + "- `notebook_path`- path to this jupyter notebook\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1715a373", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "from openfl.experimental.workflow.runtime import FederatedRuntime\n", + "\n", + "director_info = {\n", + " \"director_node_fqdn\": \"localhost\",\n", + " \"director_port\": 50050,\n", + "}\n", + "\n", + "federated_runtime = FederatedRuntime(\n", + " collaborators=collaborator_names,\n", + " director=director_info,\n", + " notebook_path=\"./HF_FederatedRuntime.ipynb\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "58d22bbb", + "metadata": {}, + "source": [ + "Let us connect to federation & check if the envoys are connected to the director by using the `get_envoys` method of `FederatedRuntime`. If the participants are launched successful in previous step the status of `Portland` and `Seattle` should be displayed as `Online`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f1be87f", + "metadata": {}, + "outputs": [], + "source": [ + "federated_runtime.get_envoys()" + ] + }, + { + "cell_type": "markdown", + "id": "87c487cb", + "metadata": {}, + "source": [ + "Now that we have our distributed infrastructure ready, let us modify the flow runtime to `FederatedRuntime` instance and deploy the experiment. \n", + "\n", + "Progress of the flow is available on \n", + "1. Jupyter notebook: if `checkpoint` attribute of the flow object is set to `True`\n", + "2. Director and Envoy terminals \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9509fe8e", + "metadata": {}, + "outputs": [], + "source": [ + "# Load imdb dataset\n", + "imdb_dataset = load_dataset(\"imdb\")\n", + "collabs = [\"Portland\", \"Seattle\"]\n", + "\n", + "for idx, c in enumerate(collabs):\n", + " local_train = imdb_dataset[\"train\"].select(list(range(idx, len(imdb_dataset[\"train\"]), 2)))\n", + " local_test = imdb_dataset[\"test\"].select(list(range(idx, len(imdb_dataset[\"test\"]), 2)))\n", + "\n", + " local_train.save_to_disk(f\"../{c}/data/imdb_train_{c.lower()}\")\n", + " local_test.save_to_disk(f\"../{c}/data/imdb_test_{c.lower()}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6d19819", + "metadata": {}, + "outputs": [], + "source": [ + "flflow.results = [] # clear results from previous run\n", + "flflow.runtime = federated_runtime\n", + "flflow.run()" + ] + }, + { + "cell_type": "markdown", + "id": "5e5ef3ea", + "metadata": {}, + "source": [ + "Let us compare the simulation results from `LocalRuntime` and federation results from `FederatedRuntime`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4b63ce0", + "metadata": {}, + "outputs": [], + "source": [ + "headers = [\n", + " \"Rounds\",\n", + " \"Agg Model Validation Accuracy\",\n", + " \"Agg Model Validation F1\",\n", + " \"Local Train loss\",\n", + " \"Local Model Validation Accuracy\",\n", + " \"Local Model Validation F1\",\n", + "]\n", + "\n", + "print(\"********** Simulation results **********\")\n", + "print(tabulate(simulation_results, headers=headers, tablefmt=\"outline\"))\n", + "\n", + "print(\"********** Federation results **********\")\n", + "federation_results = flflow.results\n", + "print(tabulate(federation_results, headers=headers, tablefmt=\"outline\"))" + ] + }, + { + "cell_type": "markdown", + "id": "0568e9a5", + "metadata": {}, + "source": [ + "### Remove downloaded and generated files" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bc10a411", + "metadata": {}, + "outputs": [], + "source": [ + "import shutil\n", + "\n", + "for c in [\"Portland\", \"Seattle\"]:\n", + " shutil.rmtree(f\"../{c}/__pycache__\")\n", + " shutil.rmtree(f\"../{c}/data/imdb_train_{c.lower()}\")\n", + " shutil.rmtree(f\"../{c}/data/imdb_test_{c.lower()}\")\n", + "\n", + "shutil.rmtree(\"trainer_output\")\n", + "shutil.rmtree(\"generated_workspace\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "openfl", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}