本筆記本教您如何使用 MoveNet 和 TensorFlow Lite 訓練姿勢分類模型。結果會是一個新的 TensorFlow Lite 模型,可接受 MoveNet 模型的輸出做為輸入,並輸出姿勢分類,例如瑜珈姿勢的名稱。
本筆記本中的程序包含 3 個部分
- 第 1 部分:將姿勢分類訓練資料預先處理成 CSV 檔案,其中指定 MoveNet 模型偵測到的地標 (身體關鍵點),以及實際姿勢標籤。
- 第 2 部分:建構並訓練姿勢分類模型,該模型會將 CSV 檔案中的地標座標做為輸入,並輸出預測的標籤。
- 第 3 部分:將姿勢分類模型轉換為 TFLite。
根據預設,本筆記本使用包含標籤瑜珈姿勢的影像資料集,但我們也在第 1 部分中加入一個章節,您可以在其中上傳您自己的姿勢影像資料集。
![]() |
![]() |
![]() |
![]() |
![]() |
準備工作
在本節中,您將匯入必要的程式庫,並定義數個函式,以將訓練影像預先處理成 CSV 檔案,其中包含地標座標和實際標籤。
這裡不會發生任何可觀察到的事情,但您可以展開隱藏的程式碼儲存格,以查看我們稍後將呼叫的某些函式的實作方式。
如果您只想建立 CSV 檔案,而不想瞭解所有詳細資訊,只需執行本節並繼續進行第 1 部分即可。
pip install -q opencv-python
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
使用 MoveNet 執行姿勢估計的程式碼
使用 MoveNet 執行姿勢估計的函式
視覺化姿勢估計結果的函式。
載入影像、偵測姿勢地標並將其儲存到 CSV 檔案的程式碼
(選用) 試用 Movenet 姿勢估計邏輯的程式碼片段
第 1 部分:預先處理輸入影像
因為我們的姿勢分類器的輸入是 MoveNet 模型的輸出地標,所以我們需要產生訓練資料集,方法是透過 MoveNet 執行標記的影像,然後將所有地標資料和實際標籤擷取到 CSV 檔案中。
我們為本教學課程提供的資料集是 CG 產生的瑜珈姿勢資料集。它包含多個 CG 產生的模型執行 5 種不同瑜珈姿勢的影像。目錄已分割為 train
資料集和 test
資料集。
因此在本節中,我們將下載瑜珈資料集並透過 MoveNet 執行,以便將所有地標擷取到 CSV 檔案中… 但是,將我們的瑜珈資料集饋送給 MoveNet 並產生此 CSV 檔案大約需要 15 分鐘。因此,作為替代方案,您可以將下方的 is_skip_step_1
參數設定為 True,以下載瑜珈資料集的預先存在 CSV 檔案。這樣,您將跳過此步驟,而是下載將在此預先處理步驟中建立的相同 CSV 檔案。
另一方面,如果您想使用自己的影像資料集訓練姿勢分類器,您需要上傳您的影像並執行此預先處理步驟 (保留 is_skip_step_1
False) — 依照以下指示上傳您自己的姿勢資料集。
(選用) 上傳您自己的姿勢資料集
如果您想使用自己標記的姿勢訓練姿勢分類器 (它們可以是任何姿勢,而不僅限於瑜珈姿勢),請依照下列步驟進行
將上方的
use_custom_dataset
選項設定為 True。準備一個封存檔案 (ZIP、TAR 或其他),其中包含一個資料夾,內含您的影像資料集。資料夾必須包含您姿勢的已排序影像,如下所示。
如果您已將資料集分割為訓練集和測試集,則將
dataset_is_split
設定為 True。也就是說,您的影像資料夾必須包含「train」和「test」目錄,如下所示yoga_poses/ |__ train/ |__ downdog/ |______ 00000128.jpg |______ ... |__ test/ |__ downdog/ |______ 00000181.jpg |______ ...
或者,如果您的資料集尚未分割,則將
dataset_is_split
設定為 False,我們將根據指定的分割比例來分割它。也就是說,您上傳的影像資料夾應如下所示yoga_poses/ |__ downdog/ |______ 00000128.jpg |______ 00000181.jpg |______ ... |__ goddess/ |______ 00000243.jpg |______ 00000306.jpg |______ ...
按一下左側的「檔案」標籤 (資料夾圖示),然後按一下「上傳到工作階段儲存空間」(檔案圖示)。
選取您的封存檔案,並等待其完成上傳,然後再繼續。
編輯以下程式碼區塊,以指定您的封存檔案和影像目錄的名稱。(根據預設,我們預期是 ZIP 檔案,因此如果您的封存檔案是其他格式,您也需要修改該部分。)
現在執行筆記本的其餘部分。
if use_custom_dataset:
# ATTENTION:
# You must edit these two lines to match your archive and images folder name:
# !tar -xf YOUR_DATASET_ARCHIVE_NAME.tar
!unzip -q YOUR_DATASET_ARCHIVE_NAME.zip
dataset_in = 'YOUR_DATASET_DIR_NAME'
# You can leave the rest alone:
if not os.path.isdir(dataset_in):
raise Exception("dataset_in is not a valid directory")
if dataset_is_split:
IMAGES_ROOT = dataset_in
else:
dataset_out = 'split_' + dataset_in
split_into_train_test(dataset_in, dataset_out, test_split=0.2)
IMAGES_ROOT = dataset_out
下載瑜珈資料集
if not is_skip_step_1 and not use_custom_dataset:
!wget -O yoga_poses.zip http://download.tensorflow.org/data/pose_classification/yoga_poses.zip
!unzip -q yoga_poses.zip -d yoga_cg
IMAGES_ROOT = "yoga_cg"
預先處理 TRAIN
資料集
if not is_skip_step_1:
images_in_train_folder = os.path.join(IMAGES_ROOT, 'train')
images_out_train_folder = 'poses_images_out_train'
csvs_out_train_path = 'train_data.csv'
preprocessor = MoveNetPreprocessor(
images_in_folder=images_in_train_folder,
images_out_folder=images_out_train_folder,
csvs_out_path=csvs_out_train_path,
)
preprocessor.process(per_pose_class_limit=None)
預先處理 TEST
資料集
if not is_skip_step_1:
images_in_test_folder = os.path.join(IMAGES_ROOT, 'test')
images_out_test_folder = 'poses_images_out_test'
csvs_out_test_path = 'test_data.csv'
preprocessor = MoveNetPreprocessor(
images_in_folder=images_in_test_folder,
images_out_folder=images_out_test_folder,
csvs_out_path=csvs_out_test_path,
)
preprocessor.process(per_pose_class_limit=None)
第 2 部分:訓練姿勢分類模型,該模型會將地標座標做為輸入,並輸出預測的標籤。
您將建構一個 TensorFlow 模型,該模型會取得地標座標,並預測輸入影像中人員執行的姿勢類別。模型包含兩個子模型
- 子模型 1 從偵測到的地標座標計算姿勢嵌入 (又稱為特徵向量)。
- 子模型 2 將姿勢嵌入饋送通過數個
Dense
層,以預測姿勢類別。
然後,您將根據第 1 部分中預先處理的資料集來訓練模型。
(選用) 如果您未執行第 1 部分,請下載預先處理的資料集
# Download the preprocessed CSV files which are the same as the output of step 1
if is_skip_step_1:
!wget -O train_data.csv http://download.tensorflow.org/data/pose_classification/yoga_train_data.csv
!wget -O test_data.csv http://download.tensorflow.org/data/pose_classification/yoga_test_data.csv
csvs_out_train_path = 'train_data.csv'
csvs_out_test_path = 'test_data.csv'
is_skipped_step_1 = True
將預先處理的 CSV 載入 TRAIN
和 TEST
資料集。
def load_pose_landmarks(csv_path):
"""Loads a CSV created by MoveNetPreprocessor.
Returns:
X: Detected landmark coordinates and scores of shape (N, 17 * 3)
y: Ground truth labels of shape (N, label_count)
classes: The list of all class names found in the dataset
dataframe: The CSV loaded as a Pandas dataframe features (X) and ground
truth labels (y) to use later to train a pose classification model.
"""
# Load the CSV file
dataframe = pd.read_csv(csv_path)
df_to_process = dataframe.copy()
# Drop the file_name columns as you don't need it during training.
df_to_process.drop(columns=['file_name'], inplace=True)
# Extract the list of class names
classes = df_to_process.pop('class_name').unique()
# Extract the labels
y = df_to_process.pop('class_no')
# Convert the input features and labels into the correct format for training.
X = df_to_process.astype('float64')
y = keras.utils.to_categorical(y)
return X, y, classes, dataframe
將原始 TRAIN
資料集載入並分割為 TRAIN
(85% 的資料) 和 VALIDATE
(剩餘的 15%)。
# Load the train data
X, y, class_names, _ = load_pose_landmarks(csvs_out_train_path)
# Split training data (X, y) into (X_train, y_train) and (X_val, y_val)
X_train, X_val, y_train, y_val = train_test_split(X, y,
test_size=0.15)
# Load the test data
X_test, y_test, _, df_test = load_pose_landmarks(csvs_out_test_path)
定義函式以將姿勢地標轉換為姿勢嵌入 (又稱為特徵向量),以進行姿勢分類
接下來,透過以下方式將地標座標轉換為特徵向量
- 將姿勢中心移至原點。
- 縮放姿勢,使姿勢大小變為 1
- 將這些座標展平為特徵向量
然後使用此特徵向量來訓練以神經網路為基礎的姿勢分類器。
def get_center_point(landmarks, left_bodypart, right_bodypart):
"""Calculates the center point of the two given landmarks."""
left = tf.gather(landmarks, left_bodypart.value, axis=1)
right = tf.gather(landmarks, right_bodypart.value, axis=1)
center = left * 0.5 + right * 0.5
return center
def get_pose_size(landmarks, torso_size_multiplier=2.5):
"""Calculates pose size.
It is the maximum of two values:
* Torso size multiplied by `torso_size_multiplier`
* Maximum distance from pose center to any pose landmark
"""
# Hips center
hips_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
# Shoulders center
shoulders_center = get_center_point(landmarks, BodyPart.LEFT_SHOULDER,
BodyPart.RIGHT_SHOULDER)
# Torso size as the minimum body size
torso_size = tf.linalg.norm(shoulders_center - hips_center)
# Pose center
pose_center_new = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
pose_center_new = tf.expand_dims(pose_center_new, axis=1)
# Broadcast the pose center to the same size as the landmark vector to
# perform substraction
pose_center_new = tf.broadcast_to(pose_center_new,
[tf.size(landmarks) // (17*2), 17, 2])
# Dist to pose center
d = tf.gather(landmarks - pose_center_new, 0, axis=0,
name="dist_to_pose_center")
# Max dist to pose center
max_dist = tf.reduce_max(tf.linalg.norm(d, axis=0))
# Normalize scale
pose_size = tf.maximum(torso_size * torso_size_multiplier, max_dist)
return pose_size
def normalize_pose_landmarks(landmarks):
"""Normalizes the landmarks translation by moving the pose center to (0,0) and
scaling it to a constant pose size.
"""
# Move landmarks so that the pose center becomes (0,0)
pose_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
pose_center = tf.expand_dims(pose_center, axis=1)
# Broadcast the pose center to the same size as the landmark vector to perform
# substraction
pose_center = tf.broadcast_to(pose_center,
[tf.size(landmarks) // (17*2), 17, 2])
landmarks = landmarks - pose_center
# Scale the landmarks to a constant pose size
pose_size = get_pose_size(landmarks)
landmarks /= pose_size
return landmarks
def landmarks_to_embedding(landmarks_and_scores):
"""Converts the input landmarks into a pose embedding."""
# Reshape the flat input into a matrix with shape=(17, 3)
reshaped_inputs = keras.layers.Reshape((17, 3))(landmarks_and_scores)
# Normalize landmarks 2D
landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])
# Flatten the normalized landmark coordinates into a vector
embedding = keras.layers.Flatten()(landmarks)
return embedding
定義姿勢分類的 Keras 模型
我們的 Keras 模型會取得偵測到的姿勢地標,然後計算姿勢嵌入並預測姿勢類別。
# Define the model
inputs = tf.keras.Input(shape=(51))
embedding = landmarks_to_embedding(inputs)
layer = keras.layers.Dense(128, activation=tf.nn.relu6)(embedding)
layer = keras.layers.Dropout(0.5)(layer)
layer = keras.layers.Dense(64, activation=tf.nn.relu6)(layer)
layer = keras.layers.Dropout(0.5)(layer)
outputs = keras.layers.Dense(len(class_names), activation="softmax")(layer)
model = keras.Model(inputs, outputs)
model.summary()
model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Add a checkpoint callback to store the checkpoint that has the highest
# validation accuracy.
checkpoint_path = "weights.best.hdf5"
checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path,
monitor='val_accuracy',
verbose=1,
save_best_only=True,
mode='max')
earlystopping = keras.callbacks.EarlyStopping(monitor='val_accuracy',
patience=20)
# Start training
history = model.fit(X_train, y_train,
epochs=200,
batch_size=16,
validation_data=(X_val, y_val),
callbacks=[checkpoint, earlystopping])
# Visualize the training history to see whether you're overfitting.
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')
plt.show()
# Evaluate the model using the TEST dataset
loss, accuracy = model.evaluate(X_test, y_test)
繪製混淆矩陣以更瞭解模型效能
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""Plots the confusion matrix."""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=55)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
# Classify pose in the TEST dataset using the trained model
y_pred = model.predict(X_test)
# Convert the prediction result to class name
y_pred_label = [class_names[i] for i in np.argmax(y_pred, axis=1)]
y_true_label = [class_names[i] for i in np.argmax(y_test, axis=1)]
# Plot the confusion matrix
cm = confusion_matrix(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
plot_confusion_matrix(cm,
class_names,
title ='Confusion Matrix of Pose Classification Model')
# Print the classification report
print('\nClassification Report:\n', classification_report(y_true_label,
y_pred_label))
(選用) 調查不正確的預測
您可以查看 TEST
資料集中不正確預測的姿勢,以查看是否可以提高模型準確度。
if is_skip_step_1:
raise RuntimeError('You must have run step 1 to run this cell.')
# If step 1 was skipped, skip this step.
IMAGE_PER_ROW = 3
MAX_NO_OF_IMAGE_TO_PLOT = 30
# Extract the list of incorrectly predicted poses
false_predict = [id_in_df for id_in_df in range(len(y_test)) \
if y_pred_label[id_in_df] != y_true_label[id_in_df]]
if len(false_predict) > MAX_NO_OF_IMAGE_TO_PLOT:
false_predict = false_predict[:MAX_NO_OF_IMAGE_TO_PLOT]
# Plot the incorrectly predicted images
row_count = len(false_predict) // IMAGE_PER_ROW + 1
fig = plt.figure(figsize=(10 * IMAGE_PER_ROW, 10 * row_count))
for i, id_in_df in enumerate(false_predict):
ax = fig.add_subplot(row_count, IMAGE_PER_ROW, i + 1)
image_path = os.path.join(images_out_test_folder,
df_test.iloc[id_in_df]['file_name'])
image = cv2.imread(image_path)
plt.title("Predict: %s; Actual: %s"
% (y_pred_label[id_in_df], y_true_label[id_in_df]))
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
第 3 部分:將姿勢分類模型轉換為 TensorFlow Lite
您將把 Keras 姿勢分類模型轉換為 TensorFlow Lite 格式,以便您可以將其部署到行動應用程式、網路瀏覽器和邊緣裝置。轉換模型時,您將套用動態範圍量化,以將姿勢分類 TensorFlow Lite 模型大小縮減約 4 倍,而準確度損失微乎其微。
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
print('Model size: %dKB' % (len(tflite_model) / 1024))
with open('pose_classifier.tflite', 'wb') as f:
f.write(tflite_model)
然後,您將寫入標籤檔案,其中包含從類別索引到人類可讀類別名稱的對應。
with open('pose_labels.txt', 'w') as f:
f.write('\n'.join(class_names))
由於您已套用量化來縮減模型大小,因此讓我們評估量化的 TFLite 模型,以檢查準確度下降是否可接受。
def evaluate_model(interpreter, X, y_true):
"""Evaluates the given TFLite model and return its accuracy."""
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]
# Run predictions on all given poses.
y_pred = []
for i in range(len(y_true)):
# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = X[i: i + 1].astype('float32')
interpreter.set_tensor(input_index, test_image)
# Run inference.
interpreter.invoke()
# Post-processing: remove batch dimension and find the class with highest
# probability.
output = interpreter.tensor(output_index)
predicted_label = np.argmax(output()[0])
y_pred.append(predicted_label)
# Compare prediction results with ground truth labels to calculate accuracy.
y_pred = keras.utils.to_categorical(y_pred)
return accuracy_score(y_true, y_pred)
# Evaluate the accuracy of the converted TFLite model
classifier_interpreter = tf.lite.Interpreter(model_content=tflite_model)
classifier_interpreter.allocate_tensors()
print('Accuracy of TFLite model: %s' %
evaluate_model(classifier_interpreter, X_test, y_test))
現在您可以下載 TFLite 模型 (pose_classifier.tflite
) 和標籤檔案 (pose_labels.txt
) 以分類自訂姿勢。請參閱 Android 和 Python/Raspberry Pi 範例應用程式,以取得如何使用 TFLite 姿勢分類模型的端對端範例。
zip pose_classifier.zip pose_labels.txt pose_classifier.tflite
# Download the zip archive if running on Colab.
try:
from google.colab import files
files.download('pose_classifier.zip')
except:
pass