From 8e99dae59d34a0ebad4b23459c38374cf94e9696 Mon Sep 17 00:00:00 2001 From: arthurttz Date: Wed, 13 May 2026 11:39:59 +0200 Subject: [PATCH] First commit --- datasettransform.py | 43 +++++++++++++ graph.py | 3 + inspi.py | 124 ++++++++++++++++++++++++++++++++++++ model.py | 99 +++++++++++++++++++++++++++++ split.py | 8 +++ splitfoldersclean.py | 8 +++ test.py | 25 ++++++++ traindata.py | 147 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 457 insertions(+) create mode 100644 datasettransform.py create mode 100644 graph.py create mode 100644 inspi.py create mode 100644 model.py create mode 100644 split.py create mode 100644 splitfoldersclean.py create mode 100644 test.py create mode 100644 traindata.py diff --git a/datasettransform.py b/datasettransform.py new file mode 100644 index 0000000..b39b27f --- /dev/null +++ b/datasettransform.py @@ -0,0 +1,43 @@ +# TRANSFORME LES FICHIERS .TXT EN MATRICE 8x512, CE SCRIPT EST UTILISE DANS TRAINDATA.PY + +import torch +from torch.utils.data import DataLoader, TensorDataset +import numpy as np +import os + +train_batch_size = 16 +data_dir = "C:/DATA/M1/Stages/Fablab/Dataset" + +LST2 = [] +label = [] + +for ID, sub_dir in enumerate(os.listdir(data_dir)): + for file in os.listdir(os.path.join(data_dir, sub_dir)): + label.append(ID) + + f = open(os.path.join(data_dir, sub_dir, file), 'r+') + lst_f = [] + + for i in f: + i = i.replace('\n', '') + lst_f.append(int(i)) + + # matrice = np.array(lst_f).reshape(8, 512) + # LST_2D.append(matrice) + + matrice = np.array(lst_f[:4096]).reshape(8, 512) + LST2.append(matrice) + +# X devient (nombre de fichiers, channel, listes, données), avec 1 pour le canal d'entrée requis par EEGNet +X = torch.Tensor(np.array(LST2)).unsqueeze(1) +Y = torch.LongTensor(label) + +# créer le DataLoader +dataset = TensorDataset(X, Y) +data_loader = DataLoader(dataset, batch_size=train_batch_size, shuffle=True) + +print(f' Longueur de la liste: {len(LST2)}') +print(f' Nombre de fichiers: {len(label)}') +print(f' Forme de la liste des tensor: {X.shape}') +print("train batch size:",data_loader.batch_size, + ", num of batch:", len(data_loader)) \ No newline at end of file diff --git a/graph.py b/graph.py new file mode 100644 index 0000000..6f63abb --- /dev/null +++ b/graph.py @@ -0,0 +1,3 @@ +from traindata import loss_train_hist, acc_train_hist, loss_val_hist = [] +acc_val_hist = [] + diff --git a/inspi.py b/inspi.py new file mode 100644 index 0000000..a7e96fe --- /dev/null +++ b/inspi.py @@ -0,0 +1,124 @@ +# CONTIENT UNE VERSION DU MODELE ET UN DATASET PERSONNALISE, +# JE PEUX M'EN INSPIRER POUR NETTOYER DATASETTRANSFORM.PY ET LIRE LES FICHIERS + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +import numpy as np +import matplotlib.pyplot as plt +from torchmetrics import Accuracy +import os + +# --- Paramètres globaux --- +BATCH_SIZE = 8 +EPOCHS = 50 +fs = 173 +channel = 1 +num_input = 1 +num_class = 5 +signal_length = 4097 +device = 'cpu' + +# --- Architecture EEGNet (inchangée) --- +F1, D = 8, 3 +F2 = D * F1 + +class EEGNet(nn.Module): + def __init__(self): + super().__init__() + self.conv2d = nn.Conv2d(num_input, F1, (1, round(fs/2)), padding=(0, round(fs/4)-1)) + self.Batch_normalization_1 = nn.BatchNorm2d(F1) + self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, (channel, 1), groups=F1) + self.Batch_normalization_2 = nn.BatchNorm2d(D*F1) + self.Elu = nn.ELU() + self.Average_pooling2D_1 = nn.AvgPool2d((1, 4)) + self.Dropout = nn.Dropout2d(0.2) + self.Separable_conv2D_depth = nn.Conv2d(D*F1, D*F1, (1, round(fs/8)), padding=(0, round(fs/16)), groups=D*F1) + self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, (1, 1)) + self.Batch_normalization_3 = nn.BatchNorm2d(F2) + self.Average_pooling2D_2 = nn.AvgPool2d((1, 8)) + self.Flatten = nn.Flatten() + # Calcul dynamique de la taille de sortie pour la couche Dense + self.Dense = nn.Linear(F2 * (signal_length // 32), num_class) + self.Softmax = nn.Softmax(dim=1) + + def forward(self, x): + y = self.Batch_normalization_1(self.conv2d(x)) + y = self.Batch_normalization_2(self.Depthwise_conv2D(y)) + y = self.Elu(y) + y = self.Dropout(self.Average_pooling2D_1(y)) + y = self.Separable_conv2D_depth(y) + y = self.Batch_normalization_3(self.Separable_conv2D_point(y)) + y = self.Elu(y) + y = self.Dropout(self.Average_pooling2D_2(y)) + y = self.Flatten(y) + return self.Softmax(self.Dense(y)) + +# --- Dataset adapté à ton architecture de dossiers --- +class EEGDataset(torch.utils.data.Dataset): + def __init__(self, root_path, signal_length=4097): + self.root_path = root_path + self.signal_length = signal_length + self.class_map = {'F': 0, 'N': 1, 'O': 2, 'S': 3, 'Z': 4} + self.samples = [] + + # Parcourt récursivement root_path (ex: data/train/) pour trouver les .txt dans F, N, O, S, Z + for class_name in self.class_map.keys(): + class_folder = os.path.join(root_path, class_name) + if os.path.exists(class_folder): + for f in os.listdir(class_folder): + if f.lower().endswith('.txt'): + self.samples.append((os.path.join(class_folder, f), self.class_map[class_name])) + + def __len__(self): + return len(self.samples) + + def __getitem__(self, idx): + file_path, label = self.samples[idx] + signal = [] + with open(file_path, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('['): + try: + signal.append(float(line)) + except ValueError: continue + + # Padding ou troncature pour assurer signal_length + if len(signal) < self.signal_length: + signal.extend([0.0] * (self.signal_length - len(signal))) + else: + signal = signal[:self.signal_length] + + x = torch.tensor(signal, dtype=torch.float32).view(1, 1, -1) + return x, torch.tensor(label, dtype=torch.long) + +# --- Initialisation des données --- +data_root = "C:/DATA/M1/Stages/Fablab/data" +train_loader = DataLoader(EEGDataset(os.path.join(data_root, 'train')), + batch_size=BATCH_SIZE, + shuffle=True) +val_loader = DataLoader(EEGDataset(os.path.join(data_root, 'val')), + batch_size=BATCH_SIZE, + shuffle=False) +test_loader = DataLoader(EEGDataset(os.path.join(data_root, 'test')), + batch_size=BATCH_SIZE, + shuffle=False) + +# --- Fonction de Visualisation --- +def plot_eeg_samples(loader, n_samples=3): + samples, labels = next(iter(loader)) + classes_names = {0: 'F', 1: 'N', 2: 'O', 3: 'S', 4: 'Z'} + + plt.figure(figsize=(12, 8)) + for i in range(min(n_samples, len(samples))): + plt.subplot(n_samples, 1, i+1) + plt.plot(samples[i].view(-1).numpy()) + plt.title(f"Classe: {classes_names[labels[i].item()]}") + plt.ylabel("Amplitude") + plt.tight_layout() + plt.show() + +# Test de visualisation +plot_eeg_samples(train_loader) \ No newline at end of file diff --git a/model.py b/model.py new file mode 100644 index 0000000..619d688 --- /dev/null +++ b/model.py @@ -0,0 +1,99 @@ +# ARCHITECTURE DU MODELE EEGNET + +import torch +import torchvision +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, TensorDataset +from torchmetrics import Accuracy +import torch.utils.data as data + +#print(torch.cuda.is_available()) +# torch.cuda.get_device_name(0) + +# Not Installed: +# - Nsight for Visual Studio 2022 +# Reason: VS2022 was not found +# - Nsight for Visual Studio 2026 +# Reason: VS2026 was not found + + +### SET MODEL SPECIFICATIONS ### +train_batch_size = 8 +EPOCHS = 50 + +fs= 173 #sampling frequency +channel= 8 #number of electrode +num_input= 1 #number of channel picture (for EEG signal is always : 1) +num_class= 5 #number of classes +signal_length = 512 #number of sample in each tarial + +F1= 8 #number of temporal filters +D= 3 #depth multiplier (number of spatial filters) +F2= D*F1 #number of pointwise filters + +device= 'cpu' +kernel_size_1= (1,round(fs/2)) +kernel_size_2= (channel, 1) +kernel_size_3= (1, round(fs/8)) +kernel_size_4= (1, 1) + +kernel_avgpool_1= (1,4) +kernel_avgpool_2= (1,8) +dropout_rate= 0 + +ks0= int(round((kernel_size_1[0]-1)/2)) +ks1= int(round((kernel_size_1[1]-1)/2)) +kernel_padding_1= (ks0, ks1-1) +ks0= int(round((kernel_size_3[0]-1)/2)) +ks1= int(round((kernel_size_3[1]-1)/2)) +kernel_padding_3= (ks0, ks1) + + +### DESIGN MODEL ### +class EEGNet(nn.Module): + def __init__(self): + super().__init__() + # layer 1 + self.conv2d = nn.Conv2d(num_input, F1, kernel_size_1, padding=kernel_padding_1) + self.Batch_normalization_1 = nn.BatchNorm2d(F1) + # layer 2 + self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, kernel_size_2, groups= F1) + self.Batch_normalization_2 = nn.BatchNorm2d(D*F1) + self.elu = nn.ELU() + self.Average_pooling2D_1 = nn.AvgPool2d(kernel_avgpool_1) + self.Dropout = nn.Dropout2d(dropout_rate) + # layer 3 + self.Separable_conv2D_depth = nn.Conv2d( D*F1, D*F1, kernel_size_3, + padding=kernel_padding_3, groups= D*F1) + self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, kernel_size_4) + self.Batch_normalization_3 = nn.BatchNorm2d(F2) + self.Average_pooling2D_2 = nn.AvgPool2d(kernel_avgpool_2) + # layer 4 + self.Flatten = nn.Flatten() + self.Dense = nn.Linear( + F2*round(signal_length/34), + # 2880, + num_class) + self.Softmax = nn.Softmax(dim= 1) + + def forward(self, x): + # layer 1 + y = self.Batch_normalization_1(self.conv2d(x)) #.relu() + # layer 2 + y = self.Batch_normalization_2(self.Depthwise_conv2D(y)) + y = self.elu(y) + y = self.Dropout(self.Average_pooling2D_1(y)) + # layer 3 + y = self.Separable_conv2D_depth(y) + y = self.Batch_normalization_3(self.Separable_conv2D_point(y)) + y = self.elu(y) + y = self.Dropout(self.Average_pooling2D_2(y)) + # layer 4 + y = self.Flatten(y) + y = self.Dense(y) + y = self.Softmax(y) + + return y + +model001 = EEGNet() \ No newline at end of file diff --git a/split.py b/split.py new file mode 100644 index 0000000..5a2c058 --- /dev/null +++ b/split.py @@ -0,0 +1,8 @@ +# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4097 + +import splitfolders +import os + +data_dir = "C:/DATA/M1/Stages/Fablab/Dataset" + +split.ratio(data_dir, output="C:\DATA\M1\Stages\Fablab", seed =123, ratio=(0.8, 0.1, 0.1)) \ No newline at end of file diff --git a/splitfoldersclean.py b/splitfoldersclean.py new file mode 100644 index 0000000..33ccff5 --- /dev/null +++ b/splitfoldersclean.py @@ -0,0 +1,8 @@ +# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4096 + +import splitfolders +import os + +data_dir = r"C:/DATA/M1/Stages/Fablab/Dataset" + +splitfolders.ratio(data_dir, output=r"C:/DATA/M1/Stages/Fablab/dataclean", seed =123, ratio=(0.8, 0.1, 0.1)) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..e04cb4f --- /dev/null +++ b/test.py @@ -0,0 +1,25 @@ +from traindata import * +from model import * + +# Function to test the model +def test(): + # Load the model that we saved at the end of the training loop + model001 = EEGNet() + data_dir = "C:/DATA/M1/Stages/Fablab/dataclean" + + # path = "NetModel.pth" + model001.load_state_dict(torch.load(data_dir)) + + running_accuracy = 0 + total = 0 + + with torch.no_grad(): + for data in test_loader: + inputs, outputs = data + outputs = outputs.to(torch.float32) + predicted_outputs = model(inputs) + _, predicted = torch.max(predicted_outputs, 1) + total += outputs.size(0) + running_accuracy += (predicted == outputs).sum().item() + + print('Accuracy of the model based on the test set of', test_split ,'inputs is: %d %%' % (100 * running_accuracy / total)) \ No newline at end of file diff --git a/traindata.py b/traindata.py new file mode 100644 index 0000000..0c5729d --- /dev/null +++ b/traindata.py @@ -0,0 +1,147 @@ +### COPIE-COLLE DE TRAINDATA.PY SI LE CODE NE FONCTIONNE PAS + +# LANCER L'ENTRAINEMENT, CALCULE LA LOSS ET L'ACC ET AFFICHE LES GRAPHIQUES ASSOCIES + +from datasettransform import * +from model import * + +from torchmetrics import Accuracy +import torch.nn as nn +import matplotlib.pyplot as plt +import torch.optim as optim +import pandas as pd +import os + +device= 'cpu' + + +### TRAIN FUNCTION ### +def train_one_epoch(model, train_loader, loss_fn, optimizer): + model.train() + loss_train = AverageMeter() + acc_train = Accuracy(task="multiclass", num_classes= num_class).to(device) + + for i, (inputs, targets) in enumerate(train_loader): + inputs = inputs.to(device) + targets = targets.to(device) + + outputs = model(inputs) + loss = loss_fn(outputs, targets) + + loss.backward() + nn.utils.clip_grad_norm_(model.parameters(), 1) + optimizer.step() + optimizer.zero_grad() + + loss_train.update(loss.item()) + acc_train(outputs, targets.int()) + + return model, loss_train.avg, acc_train.compute().item() + +def validation(model, val_loader, loss_fn): + model.eval() + loss_val = AverageMeter() + acc_val = Accuracy(task="multiclass", num_classes= num_class).to(device) + + for i, (inputs, targets) in enumerate(val_loader): + inputs = inputs.to(device) + targets = targets.to(device) + + outputs = model(inputs) + loss = loss_fn(outputs, targets) + + nn.utils.clip_grad_norm_(model.parameters(), 1) + + loss_val.update(loss.item()) + acc_val(outputs, targets.int()) + + return model, loss_val.avg, acc_val.compute().item() + +# rajouter def test ici ? + + +### UTILS ### +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self): + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + +### TRAINING ### +loss_fn= nn.CrossEntropyLoss().to(device) +optimizer= optim.NAdam(model001.parameters(), lr= 0.01) + +loss_train_hist = [] +acc_train_hist = [] +loss_val_hist = [] +acc_val_hist = [] + +for epoch in range(EPOCHS): + model, loss_train, acc_train = train_one_epoch(model001, + data_loader, + loss_fn, + optimizer) + model, loss_val, acc_val = validation(model001, + data_loader, + loss_fn) + + loss_train_hist.append(loss_train) + acc_train_hist.append(acc_train) + loss_val_hist.append(loss_val) + acc_val_hist.append(acc_val) + + if (epoch%10== 5)or(epoch%10== 0): + print(f'epoch {epoch}:') + print(f' train loss= {loss_train:.4}, val loss={loss_val:.4}, train acc= {int(acc_train*100)}%, val acc= {int(acc_val*100)}% \n') + + +# Sauvegarder des resultats en csv +dataframe = pd.DataFrame({"loss_train": loss_train_hist, "acc_train": acc_train_hist, "loss_val": loss_val_hist, "acc_val": acc_val_hist}) +n = 0 +dir_export = "./resultats" +for file in os.listdir(dir_export): + if file.endswith(".csv"): + n += 1 +dataframe.to_csv(f"./resultats/model_perf_{n}.csv", sep = ',', index = False) # export du csv + +### COURBE D'APPRENTISSAGE ### +# plt.plot(range(EPOCHS), acc_train_hist, 'b-', label='Train') +# plt.xlabel('Epoch') +# plt.ylabel('Acc') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de l entraînement (train)') +# plt.show() +# # print(len(acc_train_hist)) + +# ### COURBES LOSS ### +# plt.plot(loss_train_hist, label='Train loss') +# plt.plot(loss_val_hist, label='Val loss') +# plt.xlabel('Epoch') +# plt.ylabel('Loss') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de la perte (loss)') +# plt.show() + +# ### COURBES ACC ### +# plt.plot(acc_train_hist, label='Train acc') +# plt.plot(acc_val_hist, label='Val acc') +# plt.xlabel('Epoch') +# plt.ylabel('Acc') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de la précision (acc)') +# plt.show() \ No newline at end of file