實作自訂彙總

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

在本教學課程中,我們將說明 tff.aggregators 模組背後的設計原則,以及實作從用戶端到伺服器自訂值彙總的最佳實務。

先決條件。 本教學課程假設您已熟悉 聯邦核心 的基本概念,例如位置 (tff.SERVERtff.CLIENTS)、TFF 如何表示運算 (tff.tensorflow.computationtff.federated_computation) 及其類型簽名。

pip install --quiet --upgrade tensorflow-federated

設計摘要

在 TFF 中,「彙總」指的是將 tff.CLIENTS 上的一組值移動到 tff.SERVER,以產生相同類型的彙總值。也就是說,不需要提供每個個別的用戶端值。例如,在聯邦式學習中,用戶端模型更新會進行平均,以取得彙總模型更新,並套用至伺服器上的全域模型。

除了完成此目標的運算子 (例如 tff.federated_sum) 之外,TFF 還提供 tff.templates.AggregationProcess (具狀態處理程序),後者將彙總運算的類型簽名形式化,使其可以推廣到比簡單總和更複雜的形式。

tff.aggregators 模組的主要元件是用於建立 AggregationProcess工廠,其設計目的是在以下兩個方面成為 TFF 中普遍有用且可取代的建構區塊

  1. 參數化運算。 彙總是獨立的建構區塊,可以插入到其他設計為與 tff.aggregators 搭配使用的 TFF 模組中,以參數化其必要的彙總。

範例

learning_process = tff.learning.algorithms.build_weighted_fed_avg(
    ...,
    model_aggregator=tff.aggregators.MeanFactory())
  1. 彙總組合。 彙總建構區塊可以與其他彙總建構區塊組合,以建立更複雜的複合彙總。

範例

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

本教學課程的其餘部分將說明如何達成這兩個目標。

彙總處理程序

我們首先總結 tff.templates.AggregationProcess,然後說明其建立的工廠模式。

tff.templates.AggregationProcesstff.templates.MeasuredProcess,具有為彙總指定的類型簽名。特別是,initializenext 函式具有以下類型簽名

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

狀態 (類型為 state_type) 必須放置在伺服器上。next 函式接受狀態和要彙總的值 (類型為 value_type,放置在用戶端) 作為輸入引數。* 表示選用的其他輸入引數,例如加權平均中的權重。它會傳回更新的狀態物件、放置在伺服器上的相同類型的彙總值,以及一些測量值。

請注意,在 next 函式的執行之間傳遞的狀態,以及旨在報告取決於 next 函式特定執行的任何資訊的報告測量值,都可能是空的。儘管如此,它們必須明確指定,以便 TFF 的其他部分能夠遵循明確的合約。

其他 TFF 模組 (例如 tff.learning 中的模型更新) 預期會使用 tff.templates.AggregationProcess 來參數化值的彙總方式。但是,究竟彙總哪些值以及其類型簽名是什麼,取決於正在訓練的模型和其他細節,以及用於執行訓練的學習演算法。

為了使彙總獨立於運算的其餘方面,我們使用工廠模式 - 一旦可使用要彙總的物件的相關類型簽名,我們就會透過叫用工廠的 create 方法來建立適當的 tff.templates.AggregationProcess。因此,只有程式庫作者才需要直接處理彙總處理程序,他們負責建立此處理程序。

彙總處理程序工廠

有兩個用於未加權和加權彙總的抽象基本工廠類別。它們的 create 方法會取得要彙總的值的類型簽名,並傳回 tff.templates.AggregationProcess,以用於彙總此類值。

tff.aggregators.UnweightedAggregationFactory 建立的處理程序會採用兩個輸入引數:(1) 伺服器上的狀態,以及 (2) 指定類型 value_type 的值。

範例實作是 tff.aggregators.SumFactory

tff.aggregators.WeightedAggregationFactory 建立的處理程序會採用三個輸入引數:(1) 伺服器上的狀態、(2) 指定類型 value_type 的值,以及 (3) 類型 weight_type 的權重,如工廠使用者在叫用其 create 方法時所指定。

範例實作是 tff.aggregators.MeanFactory,後者會計算加權平均值。

工廠模式是我們如何達成上述第一個目標的方式;即彙總是獨立的建構區塊。例如,當變更哪些模型變數是可訓練時,複雜的彙總不一定需要變更;當方法 (例如 tff.learning.algorithms.build_weighted_fed_avg) 使用它時,代表它的工廠將以不同的類型簽名叫用。

組合

回想一下,一般彙總處理程序可以封裝 (a) 用戶端值的某些預先處理、(b) 值從用戶端到伺服器的移動,以及 (c) 伺服器上彙總值的某些後處理。tff.aggregators 模組內透過建構彙總工廠的實作來實現上述第二個目標 (彙總組合),以便將部分 (b) 委派給另一個彙總工廠。

實作預設會著重於與彙總相關的單一面向,而不是在單一工廠類別中實作所有必要的邏輯。在需要時,此模式可讓我們一次更換一個建構區塊。

範例是加權 tff.aggregators.MeanFactory。其實作會將提供的數值和權重在用戶端相乘,然後獨立加總加權值和權重,最後將加權值的總和除以伺服器上的權重總和。總和不是透過直接使用 tff.federated_sum 運算子來實作,而是委派給 tff.aggregators.SumFactory 的兩個執行個體。

這種結構使兩個預設總和可以替換為不同的工廠,這些工廠以不同的方式實現總和。例如,tff.aggregators.SecureSumFactory,或 tff.aggregators.UnweightedAggregationFactory 的自訂實作。反之亦然,tff.aggregators.MeanFactory 本身可以是另一個工廠 (例如 tff.aggregators.clipping_factory) 的內部彙總,前提是要在平均之前裁剪這些值。

如需使用 tff.aggregators 模組中現有工廠的組合機制的建議用法,請參閱先前的調整建議的學習彙總教學課程。

範例最佳實務

我們將透過實作一個簡單的範例工作,逐步使其更通用,來詳細說明 tff.aggregators 概念。另一種學習方式是查看現有工廠的實作。

import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

範例工作不是加總 value,而是加總 value * 2.0,然後將總和除以 2.0。因此,彙總結果在數學上等同於直接加總 value,並且可以認為由三個部分組成:(1) 用戶端縮放 (2) 跨用戶端加總 (3) 伺服器取消縮放。

依照上面說明的設計,邏輯將實作為 tff.aggregators.UnweightedAggregationFactory 的子類別,後者會在給定要彙總的 value_type 時建立適當的 tff.templates.AggregationProcess

最小實作

對於範例工作,必要的運算始終相同,因此不需要使用狀態。因此,它是空的,並表示為 tff.federated_value((), tff.SERVER)。測量值也相同,目前為止。

因此,工作的最小實作如下

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

可以使用以下程式碼驗證一切是否如預期般運作

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

具狀態性和測量值

具狀態性廣泛用於 TFF 中,以表示預期會反覆執行且每次反覆運算都會變更的運算。例如,學習運算的狀態包含正在學習的模型權重。

為了說明如何在彙總運算中使用狀態,我們修改範例工作。我們不是將 value 乘以 2.0,而是將其乘以反覆運算索引 - 彙總已執行的次數。

為此,我們需要一種方法來追蹤反覆運算索引,這可以透過狀態的概念來實現。在 initialize_fn 中,我們不是建立空狀態,而是將狀態初始化為純量零。然後,可以在 next_fn 中分三個步驟使用狀態:(1) 遞增 1.0,(2) 用於乘以 value,以及 (3) 作為新的更新狀態傳回。

完成此操作後,您可能會注意到:但是可以使用與上述完全相同的程式碼來驗證一切是否如預期般運作。我如何知道某些東西實際上已變更?

好問題!這就是測量值的概念變得有用的地方。一般而言,測量值可以報告與 next 函式的單次執行相關的任何值,這些值可用於監控。在這種情況下,它可以是先前範例中的 summed_value。也就是說,「取消縮放」步驟之前的值,該值應取決於反覆運算索引。同樣,這在實務中不一定有用,但說明了相關機制。

因此,工作的具狀態回應如下所示

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tensorflow.computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

請注意,作為輸入進入 next_fnstate 放置在伺服器上。為了在用戶端使用它,首先需要傳達它,這可以使用 tff.federated_broadcast 運算子來實現。

為了驗證一切是否如預期般運作,我們現在可以查看報告的 measurements,即使使用相同的 client_data 執行,每次執行回合也應該有所不同。

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

結構化類型

在聯邦式學習中訓練的模型的模型權重通常表示為張量的集合,而不是單個張量。在 TFF 中,這表示為 tff.StructType,並且一般有用的彙總工廠需要能夠接受結構化類型。

但是,在上面的範例中,我們僅使用 tff.TensorType 物件。如果我們嘗試使用先前的工廠來建立具有 tff.StructType([(np.float32, (2,)), (np.float32, (3,))]) 的彙總處理程序,我們會收到一個奇怪的錯誤,因為 TensorFlow 會嘗試將 tf.Tensorlist 相乘。

問題在於,我們需要將結構中的每個張量乘以常數,而不是將張量的結構乘以常數。此問題的常見解決方案是在建立的 tff.tensorflow.computation 中使用 tf.nest 模組。

因此,與結構化類型相容的先前 ExampleTaskFactory 版本如下所示

@tff.tensorflow.computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tensorflow.computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tensorflow.computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

此範例突顯了一種模式,該模式可能在建構 TFF 程式碼時很有用。當不處理非常簡單的運算時,當在單獨的位置建立將用作 tff.federated_computation 內建構區塊的 tff.tensorflow.computation 時,程式碼會變得更易於理解。在 tff.federated_computation 內部,這些建構區塊僅使用內建運算子連接。

為了驗證它是否如預期般運作

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(np.float32, (2,)), (np.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

內部彙總

最後一步是選擇性地啟用將實際彙總委派給其他工廠,以便輕鬆組合不同的彙總技術。

這是透過在我們的 ExampleTaskFactory 的建構函式中建立選用的 inner_factory 引數來實現的。如果未指定,則使用 tff.aggregators.SumFactory,後者會套用直接在上一節中使用的 tff.federated_sum 運算子。

當呼叫 create 時,我們可以先呼叫 inner_factorycreate,以使用相同的 value_type 建立內部彙總處理程序。

initialize_fn 傳回的處理程序狀態由兩個部分組成:由「此」處理程序建立的狀態,以及剛建立的內部處理程序的狀態。

next_fn 的實作不同之處在於,實際彙總委派給內部處理程序的 next 函式,以及最終輸出如何組合。狀態再次由「此」和「內部」狀態組成,並且測量值以類似於 OrderedDict 的方式組合。

以下是此模式的實作。

@tff.tensorflow.computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tensorflow.computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tensorflow.computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

當委派給 inner_process.next 函式時,我們取得的傳回結構是 tff.templates.MeasuredProcessOutput,具有相同的三個欄位 - stateresultmeasurements。在建立組合彙總處理程序的整體傳回結構時,statemeasurements 欄位通常應組合在一起並傳回。相反地,result 欄位對應於正在彙總的值,而是「流經」組合彙總。

state 物件應視為工廠的實作細節,因此組合可以是任何結構。但是,measurements 對應於要在某個時間點報告給使用者的值。因此,我們建議使用 OrderedDict,並使用組合命名,使其清楚指出報告的指標來自組合中的哪個位置。

另請注意 tff.federated_zip 運算子的使用。由建立的處理程序控制的 state 物件應為 tff.FederatedType。如果我們改為在 initialize_fn 中傳回 (this_state, inner_state),則其傳回類型簽名將是包含 tff.FederatedType 的 2 元組的 tff.StructTypetff.federated_zip 的使用「提升」了最上層的 tff.FederatedType。這同樣用於 next_fn 中,以準備要傳回的狀態和測量值。

最後,我們可以看看如何將其與預設內部彙總搭配使用

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... 以及與不同的內部彙總搭配使用。例如,ExampleTaskFactory

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(np.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

摘要

在本教學課程中,我們說明了為了建立通用彙總建構區塊 (表示為彙總工廠) 而應遵循的最佳實務。通用性透過以下兩種方式的設計意圖而來

  1. 參數化運算。 彙總是獨立的建構區塊,可以插入到其他設計為與 tff.aggregators 搭配使用的 TFF 模組中,以參數化其必要的彙總,例如 tff.learning.algorithms.build_weighted_fed_avg
  2. 彙總組合。 彙總建構區塊可以與其他彙總建構區塊組合,以建立更複雜的複合彙總。