Action Prediction on Video Data based on Mediapipe Keypoints

I would like to train a Model to predict the Actions in a Video based on Mediapipe Keypoints.

Framenumber,Keypoint_Pose_0_X,Keypoint_Pose_0_Y,Keypoint_Pose_0_Visibility,Keypoint_Pose_1_X,Keypoint_Pose_1_Y,Keypoint_Pose_1_Visibility,Keypoint_Pose_2_X,Keypoint_Pose_2_Y,Keypoint_Pose_2_Visibility,Keypoint_Pose_3_X,Keypoint_Pose_3_Y,Keypoint_Pose_3_Visibility,Keypoint_Pose_4_X,Keypoint_Pose_4_Y,Keypoint_Pose_4_Visibility,Keypoint_Pose_5_X,Keypoint_Pose_5_Y,Keypoint_Pose_5_Visibility,Keypoint_Pose_6_X,Keypoint_Pose_6_Y,Keypoint_Pose_6_Visibility,Keypoint_Pose_7_X,Keypoint_Pose_7_Y,Keypoint_Pose_7_Visibility,Keypoint_Pose_8_X,Keypoint_Pose_8_Y,Keypoint_Pose_8_Visibility,Keypoint_Pose_9_X,Keypoint_Pose_9_Y,Keypoint_Pose_9_Visibility,Keypoint_Pose_10_X,Keypoint_Pose_10_Y,Keypoint_Pose_10_Visibility,Keypoint_Pose_11_X,Keypoint_Pose_11_Y,Keypoint_Pose_11_Visibility,Keypoint_Pose_12_X,Keypoint_Pose_12_Y,Keypoint_Pose_12_Visibility,Keypoint_Pose_13_X,Keypoint_Pose_13_Y,Keypoint_Pose_13_Visibility,Keypoint_Pose_14_X,Keypoint_Pose_14_Y,Keypoint_Pose_14_Visibility,Keypoint_Pose_15_X,Keypoint_Pose_15_Y,Keypoint_Pose_15_Visibility,Keypoint_Pose_16_X,Keypoint_Pose_16_Y,Keypoint_Pose_16_Visibility,Keypoint_Pose_17_X,Keypoint_Pose_17_Y,Keypoint_Pose_17_Visibility,Keypoint_Pose_18_X,Keypoint_Pose_18_Y,Keypoint_Pose_18_Visibility,Keypoint_Pose_19_X,Keypoint_Pose_19_Y,Keypoint_Pose_19_Visibility,Keypoint_Pose_20_X,Keypoint_Pose_20_Y,Keypoint_Pose_20_Visibility,Keypoint_Pose_21_X,Keypoint_Pose_21_Y,Keypoint_Pose_21_Visibility,Keypoint_Pose_22_X,Keypoint_Pose_22_Y,Keypoint_Pose_22_Visibility,Keypoint_Pose_23_X,Keypoint_Pose_23_Y,Keypoint_Pose_23_Visibility,Keypoint_Pose_24_X,Keypoint_Pose_24_Y,Keypoint_Pose_24_Visibility,Keypoint_Pose_25_X,Keypoint_Pose_25_Y,Keypoint_Pose_25_Visibility,Keypoint_Pose_26_X,Keypoint_Pose_26_Y,Keypoint_Pose_26_Visibility,Keypoint_Pose_27_X,Keypoint_Pose_27_Y,Keypoint_Pose_27_Visibility,Keypoint_Pose_28_X,Keypoint_Pose_28_Y,Keypoint_Pose_28_Visibility,Keypoint_Pose_29_X,Keypoint_Pose_29_Y,Keypoint_Pose_29_Visibility,Keypoint_Pose_30_X,Keypoint_Pose_30_Y,Keypoint_Pose_30_Visibility,Keypoint_Pose_31_X,Keypoint_Pose_31_Y,Keypoint_Pose_31_Visibility,Keypoint_Pose_32_X,Keypoint_Pose_32_Y,Keypoint_Pose_32_Visibility,Action
0,481,118,0.9999821186065674,489,106,0.9999444484710692,494,107,0.9998898506164552,498,107,0.9998225569725036,472,105,0.9999796152114868,465,104,0.9999828338623048,460,103,0.9999879598617554,502,114,0.9993197917938232,452,109,0.999990463256836,488,133,0.9998002648353576,469,132,0.9999774694442748,493,182,0.995231568813324,431,184,0.999991536140442,488,273,0.0262717884033918,401,281,0.9973281621932985,513,338,0.0641939714550972,459,362,0.984261393547058,520,357,0.0794331803917884,473,383,0.9602025747299194,525,355,0.0814166516065597,482,373,0.9599066376686096,520,349,0.0855123922228813,480,366,0.9528304934501648,497,370,0.998590648174286,476,375,0.9997231364250184,491,466,0.3849213421344757,487,468,0.9575459361076356,498,563,0.6474338173866272,493,563,0.8860534429550171,492,582,0.7143721580505371,485,582,0.7824604511260986,544,582,0.7379774451255798,537,596,0.8613807559013367,Action_1

This is the Head of my merged Dataset from different Videos i created with:

import cv2
import mediapipe as mp
import csv
import time
import random
import logging
import os
import pandas as pd

logging.basicConfig(level=logging.DEBUG, filename="logfile.log", filemode="w", format="%(asctime)s - %(levelname)s - %(message)s")


class HolisticDetector:
    def __init__(self, video_source=0, model_path=None, class_list_path=None, conf_threshold=0.45):
        self.cap = cv2.VideoCapture(video_source)
        self.mp_holistic = mp.solutions.holistic
        self.holistic = self.mp_holistic.Holistic()

        # CSV Initialization for Holistic Detector
        self.holistic_csv_file = open('pose_keypoints.csv', 'w', newline="")
        self.holistic_csv_writer = csv.writer(self.holistic_csv_file)
        
        holistic_header = ['Framenumber', 'Keypoint_Pose_0_X', 'Keypoint_Pose_0_Y', 'Keypoint_Pose_0_Visibility',
                   'Keypoint_Pose_1_X', 'Keypoint_Pose_1_Y', 'Keypoint_Pose_1_Visibility',
                   'Keypoint_Pose_2_X', 'Keypoint_Pose_2_Y', 'Keypoint_Pose_2_Visibility',
                   'Keypoint_Pose_3_X', 'Keypoint_Pose_3_Y', 'Keypoint_Pose_3_Visibility',
                   'Keypoint_Pose_4_X', 'Keypoint_Pose_4_Y', 'Keypoint_Pose_4_Visibility',
                   'Keypoint_Pose_5_X', 'Keypoint_Pose_5_Y', 'Keypoint_Pose_5_Visibility',
                   'Keypoint_Pose_6_X', 'Keypoint_Pose_6_Y', 'Keypoint_Pose_6_Visibility',
                   'Keypoint_Pose_7_X', 'Keypoint_Pose_7_Y', 'Keypoint_Pose_7_Visibility',
                   'Keypoint_Pose_8_X', 'Keypoint_Pose_8_Y', 'Keypoint_Pose_8_Visibility',
                   'Keypoint_Pose_9_X', 'Keypoint_Pose_9_Y', 'Keypoint_Pose_9_Visibility',
                   'Keypoint_Pose_10_X', 'Keypoint_Pose_10_Y', 'Keypoint_Pose_10_Visibility',
                   'Keypoint_Pose_11_X', 'Keypoint_Pose_11_Y', 'Keypoint_Pose_11_Visibility',
                   'Keypoint_Pose_12_X', 'Keypoint_Pose_12_Y', 'Keypoint_Pose_12_Visibility',
                   'Keypoint_Pose_13_X', 'Keypoint_Pose_13_Y', 'Keypoint_Pose_13_Visibility',
                   'Keypoint_Pose_14_X', 'Keypoint_Pose_14_Y', 'Keypoint_Pose_14_Visibility',
                   'Keypoint_Pose_15_X', 'Keypoint_Pose_15_Y', 'Keypoint_Pose_15_Visibility',
                   'Keypoint_Pose_16_X', 'Keypoint_Pose_16_Y', 'Keypoint_Pose_16_Visibility',
                   'Keypoint_Pose_17_X', 'Keypoint_Pose_17_Y', 'Keypoint_Pose_17_Visibility',
                   'Keypoint_Pose_18_X', 'Keypoint_Pose_18_Y', 'Keypoint_Pose_18_Visibility',
                   'Keypoint_Pose_19_X', 'Keypoint_Pose_19_Y', 'Keypoint_Pose_19_Visibility',
                   'Keypoint_Pose_20_X', 'Keypoint_Pose_20_Y', 'Keypoint_Pose_20_Visibility',
                   'Keypoint_Pose_21_X', 'Keypoint_Pose_21_Y', 'Keypoint_Pose_21_Visibility',
                   'Keypoint_Pose_22_X', 'Keypoint_Pose_22_Y', 'Keypoint_Pose_22_Visibility',
                   'Keypoint_Pose_23_X', 'Keypoint_Pose_23_Y', 'Keypoint_Pose_23_Visibility',
                   'Keypoint_Pose_24_X', 'Keypoint_Pose_24_Y', 'Keypoint_Pose_24_Visibility',
                   'Keypoint_Pose_25_X', 'Keypoint_Pose_25_Y', 'Keypoint_Pose_25_Visibility',
                   'Keypoint_Pose_26_X', 'Keypoint_Pose_26_Y', 'Keypoint_Pose_26_Visibility',
                   'Keypoint_Pose_27_X', 'Keypoint_Pose_27_Y', 'Keypoint_Pose_27_Visibility',
                   'Keypoint_Pose_28_X', 'Keypoint_Pose_28_Y', 'Keypoint_Pose_28_Visibility',
                   'Keypoint_Pose_29_X', 'Keypoint_Pose_29_Y', 'Keypoint_Pose_29_Visibility',
                   'Keypoint_Pose_30_X', 'Keypoint_Pose_30_Y', 'Keypoint_Pose_30_Visibility',
                   'Keypoint_Pose_31_X', 'Keypoint_Pose_31_Y', 'Keypoint_Pose_31_Visibility',
                   'Keypoint_Pose_32_X', 'Keypoint_Pose_32_Y', 'Keypoint_Pose_32_Visibility',
                   'Action']


        self.holistic_csv_writer.writerow(holistic_header)

        self.common_start_time = time.time()

    def run(self, frame_number, frame):
        # Convert the BGR image to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Get the results from the holistic model
        results = self.holistic.process(rgb_frame)

        # Check if any keypoints are detected
        if results.pose_landmarks:
            # Extract pose landmarks
            landmarks = results.pose_landmarks.landmark

            # Get Framenumber (use frame number as Framenumber)
            Framenumber = frame_number

            # Prepare holistic data for CSV
            holistic_data = [Framenumber]
            for i, landmark in enumerate(landmarks):
                # Convert normalized coordinates to pixel coordinates
                height, width, _ = frame.shape
                px, py = int(landmark.x * width), int(landmark.y * height)

                # Append data to holistic_data list
                holistic_data.extend([px, py, landmark.visibility])

            # Write holistic data to CSV
            self.holistic_csv_writer.writerow(holistic_data)

            # Draw the pose landmarks on the frame
            self.draw_landmarks(frame, landmarks)

    def draw_landmarks(self, frame, landmarks):
        for landmark in landmarks:
            height, width, _ = frame.shape
            cx, cy = int(landmark.x * width), int(landmark.y * height)
            cv2.circle(frame, (cx, cy), 5, (255, 0, 0), -1)

    def release_resources(self):
        self.cap.release()
        self.holistic_csv_file.close()
   
def main():
    # Set the video source (0 for default webcam)
    # video_source = 0
    video_source="Video.mp4"

    if video_source == 0: 
        countdown()

    # Initialize Holistic Detector
    holistic_detector = HolisticDetector(video_source)

    try:
        frame_number = 0  # Initialize frame number
        while True:
            # Get current frame
            ret, frame = holistic_detector.cap.read()

            # Run Holistic Detector
            holistic_detector.run(frame_number, frame)

            # Increment frame number
            frame_number += 1

            # Display the frame
            cv2.imshow('Holistic Detection', frame)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

    except Exception as e:
        logging.error(f"An error occurred: {e}")

    finally:
        # Release resources
        holistic_detector.release_resources()

        with open('pose_keypoints.csv', 'r') as file:
            csv_reader = csv.reader(file)
            lines = list(csv_reader)

        pose_keypoints_df = pd.read_csv('pose_keypoints.csv')

        
        merged_df = pose_keypoints_df

        
        print(merged_df.head())

        merged_df['Action'] = 'Neutral'

        
        merged_df.to_csv('Dataset_Neutral_3.csv', index=False)

        
        os.remove('pose_keypoints.csv')

        logging.info("Dataset.csv saved.")

if __name__ == "__main__":
    main()

The Datasets are Stored in different Folders:

.
└── Dataset/
    ├── Action_1/
    │   ├── Dataset_Action_1_1.csv
    │   └── ...
    └── Action_2/
        ├── Dataset_Action_2_1.csv
        └── ...

Now i started to train a Model based on these Datasets:

import numpy as np
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, LSTM, Dense
import joblib

class HolisticDatasetMerger:
    def __init__(self, folders):
        self.folders = folders
        self.all_datasets = []

    def merge_datasets(self):
        for folder in self.folders:
            folder_datasets = self._load_datasets_in_folder(folder)
            folder_dataset = pd.concat(folder_datasets, ignore_index=True)
            self.all_datasets.append(folder_dataset)

        full_dataset = pd.concat(self.all_datasets, ignore_index=True)
        return full_dataset

    def _load_datasets_in_folder(self, folder):
        folder_datasets = []
        for file_name in os.listdir(folder):
            if file_name.endswith('.csv'):
                file_path = os.path.join(folder, file_name)
                df = pd.read_csv(file_path)
                folder_datasets.append(df)
        return folder_datasets

def build_cnn_rnn_model(input_shape, num_classes):
    model = Sequential()
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))
    model.add(LSTM(100))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=['accuracy'])
    return model

def train_and_save_model(folders, model_path="holistic_model.h5", encoder_path="label_encoder.joblib", scaler_path="scaler.joblib"):
    dataset_merger = HolisticDatasetMerger(folders)
    full_dataset = dataset_merger.merge_datasets()

    
    diff_columns = [f'Keypoint_Pose_{i}_X' for i in range(32)] + [f'Keypoint_Pose_{i}_Y' for i in range(32)]
    full_dataset[diff_columns] = full_dataset.groupby('Framenumber')[diff_columns].diff()

    
    label_encoder = LabelEncoder()
    full_dataset['Action_Label'] = label_encoder.fit_transform(full_dataset['Action'])

    
    features = full_dataset.drop(['Framenumber', 'Action', 'Action_Label'], axis=1)
    labels = full_dataset['Action_Label']
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], X_train_scaled.shape[1], 1))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], X_test_scaled.shape[1], 1))

    
    input_shape = (X_train_reshaped.shape[1], 1)
    num_classes = len(label_encoder.classes_)
    model = build_cnn_rnn_model(input_shape, num_classes)
    model.fit(X_train_reshaped, y_train, epochs=10, batch_size=32, validation_split=0.2)

    
    model.save(model_path)
    joblib.dump(label_encoder, encoder_path)
    joblib.dump(scaler, scaler_path)

        
def main():
    folders = ['Dataset/Action1', 'Dataset/Action2]
    model_path="holistic_model.h5"
    encoder_path="label_encoder.joblib"
    scaler_path="scaler.joblib"

    train_and_save_model(folders, model_path, encoder_path, scaler_path)

if __name__ == "__main__":
    main()

I would like to test the Trained Model on a new Video:

import cv2
import mediapipe as mp
import numpy as np
from sklearn.preprocessing import LabelEncoder
import joblib
from keras.models import load_model

def load_model_and_encoder(model_path="holistic_model.h5", encoder_path="label_encoder.joblib"):
    
    model = load_model(model_path)
    label_encoder = joblib.load(encoder_path)
    return model, label_encoder

def process_live_data(model, label_encoder, scaler):
    cap = cv2.VideoCapture('Video.MP4')  
    mp_holistic = mp.solutions.holistic
    holistic = mp_holistic.Holistic()

    frame_number = 0  

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        
        results = holistic.process(rgb_frame)

        
        if results.pose_landmarks:
            # Extract pose landmarks
            landmarks = results.pose_landmarks.landmark

            # Prepare holistic data for model input
            input_data = [frame_number]  # Dynamically update the Framenumber
            for i, landmark in enumerate(landmarks):
                # Convert normalized coordinates to pixel coordinates
                height, width, _ = frame.shape
                px, py = int(landmark.x * width), int(landmark.y * height)

                # Append data to input_data list
                input_data.extend([px, py, landmark.visibility])

            # Calculate differences between consecutive frames
            if 'prev_data' not in locals():
                prev_data = np.array(input_data[1:])
            diff_data = np.array(input_data[1:]) - prev_data
            prev_data = np.array(input_data[1:])

            #
            input_data_scaled = scaler.transform(diff_data.reshape(1, -1))
            input_data_reshaped = input_data_scaled.reshape((1, input_data_scaled.shape[1], 1))

            # Make predictions using the model
            probabilities = model.predict(input_data_reshaped)
            predicted_class = np.argmax(probabilities)

            if predicted_class is not None:
                # Display the predicted action and probabilities on the frame
                predicted_action = label_encoder.inverse_transform([predicted_class])[0]
                text = f'Predicted Action: {predicted_action} - Probabilities: {probabilities[0]}'
                cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            # Draw the pose landmarks on the frame
            draw_landmarks(frame, results.pose_landmarks.landmark)

        # Display the frame
        cv2.imshow('Holistic Detection', frame)

        # Increment the frame number
        frame_number += 1

        # Break the loop if 'q' is pressed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    # Release resources
    cap.release()
    cv2.destroyAllWindows()

def draw_landmarks(frame, landmarks):
    for landmark in landmarks:
        height, width, _ = frame.shape
        cx, cy = int(landmark.x * width), int(landmark.y * height)
        cv2.circle(frame, (cx, cy), 5, (255, 0, 0), -1)

def main():
    model, label_encoder = load_model_and_encoder()

    
    scaler = joblib.load('scaler.joblib')

    
    process_live_data(model, label_encoder, scaler)

if __name__ == "__main__":
    main()

My Problem is, that the predicted Actions, that are displayed in the Test Video, are always the same and does not fit the actual Actions that can be seen in the Video itself.

Does any of you got some improvements for this kind of Action Prediction?
I would be grateful for any help!

Leave a Comment