commit 8e99dae59d34a0ebad4b23459c38374cf94e9696 Author: arthurttz Date: Wed May 13 11:39:59 2026 +0200 First commit diff --git a/datasettransform.py b/datasettransform.py new file mode 100644 index 0000000..b39b27f --- /dev/null +++ b/datasettransform.py @@ -0,0 +1,43 @@ +# TRANSFORME LES FICHIERS .TXT EN MATRICE 8x512, CE SCRIPT EST UTILISE DANS TRAINDATA.PY + +import torch +from torch.utils.data import DataLoader, TensorDataset +import numpy as np +import os + +train_batch_size = 16 +data_dir = "C:/DATA/M1/Stages/Fablab/Dataset" + +LST2 = [] +label = [] + +for ID, sub_dir in enumerate(os.listdir(data_dir)): + for file in os.listdir(os.path.join(data_dir, sub_dir)): + label.append(ID) + + f = open(os.path.join(data_dir, sub_dir, file), 'r+') + lst_f = [] + + for i in f: + i = i.replace('\n', '') + lst_f.append(int(i)) + + # matrice = np.array(lst_f).reshape(8, 512) + # LST_2D.append(matrice) + + matrice = np.array(lst_f[:4096]).reshape(8, 512) + LST2.append(matrice) + +# X devient (nombre de fichiers, channel, listes, données), avec 1 pour le canal d'entrée requis par EEGNet +X = torch.Tensor(np.array(LST2)).unsqueeze(1) +Y = torch.LongTensor(label) + +# créer le DataLoader +dataset = TensorDataset(X, Y) +data_loader = DataLoader(dataset, batch_size=train_batch_size, shuffle=True) + +print(f' Longueur de la liste: {len(LST2)}') +print(f' Nombre de fichiers: {len(label)}') +print(f' Forme de la liste des tensor: {X.shape}') +print("train batch size:",data_loader.batch_size, + ", num of batch:", len(data_loader)) \ No newline at end of file diff --git a/graph.py b/graph.py new file mode 100644 index 0000000..6f63abb --- /dev/null +++ b/graph.py @@ -0,0 +1,3 @@ +from traindata import loss_train_hist, acc_train_hist, loss_val_hist = [] +acc_val_hist = [] + diff --git a/inspi.py b/inspi.py new file mode 100644 index 0000000..a7e96fe --- /dev/null +++ b/inspi.py @@ -0,0 +1,124 @@ +# CONTIENT UNE VERSION DU MODELE ET UN DATASET PERSONNALISE, +# JE PEUX M'EN INSPIRER POUR NETTOYER DATASETTRANSFORM.PY ET LIRE LES FICHIERS + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +import numpy as np +import matplotlib.pyplot as plt +from torchmetrics import Accuracy +import os + +# --- Paramètres globaux --- +BATCH_SIZE = 8 +EPOCHS = 50 +fs = 173 +channel = 1 +num_input = 1 +num_class = 5 +signal_length = 4097 +device = 'cpu' + +# --- Architecture EEGNet (inchangée) --- +F1, D = 8, 3 +F2 = D * F1 + +class EEGNet(nn.Module): + def __init__(self): + super().__init__() + self.conv2d = nn.Conv2d(num_input, F1, (1, round(fs/2)), padding=(0, round(fs/4)-1)) + self.Batch_normalization_1 = nn.BatchNorm2d(F1) + self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, (channel, 1), groups=F1) + self.Batch_normalization_2 = nn.BatchNorm2d(D*F1) + self.Elu = nn.ELU() + self.Average_pooling2D_1 = nn.AvgPool2d((1, 4)) + self.Dropout = nn.Dropout2d(0.2) + self.Separable_conv2D_depth = nn.Conv2d(D*F1, D*F1, (1, round(fs/8)), padding=(0, round(fs/16)), groups=D*F1) + self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, (1, 1)) + self.Batch_normalization_3 = nn.BatchNorm2d(F2) + self.Average_pooling2D_2 = nn.AvgPool2d((1, 8)) + self.Flatten = nn.Flatten() + # Calcul dynamique de la taille de sortie pour la couche Dense + self.Dense = nn.Linear(F2 * (signal_length // 32), num_class) + self.Softmax = nn.Softmax(dim=1) + + def forward(self, x): + y = self.Batch_normalization_1(self.conv2d(x)) + y = self.Batch_normalization_2(self.Depthwise_conv2D(y)) + y = self.Elu(y) + y = self.Dropout(self.Average_pooling2D_1(y)) + y = self.Separable_conv2D_depth(y) + y = self.Batch_normalization_3(self.Separable_conv2D_point(y)) + y = self.Elu(y) + y = self.Dropout(self.Average_pooling2D_2(y)) + y = self.Flatten(y) + return self.Softmax(self.Dense(y)) + +# --- Dataset adapté à ton architecture de dossiers --- +class EEGDataset(torch.utils.data.Dataset): + def __init__(self, root_path, signal_length=4097): + self.root_path = root_path + self.signal_length = signal_length + self.class_map = {'F': 0, 'N': 1, 'O': 2, 'S': 3, 'Z': 4} + self.samples = [] + + # Parcourt récursivement root_path (ex: data/train/) pour trouver les .txt dans F, N, O, S, Z + for class_name in self.class_map.keys(): + class_folder = os.path.join(root_path, class_name) + if os.path.exists(class_folder): + for f in os.listdir(class_folder): + if f.lower().endswith('.txt'): + self.samples.append((os.path.join(class_folder, f), self.class_map[class_name])) + + def __len__(self): + return len(self.samples) + + def __getitem__(self, idx): + file_path, label = self.samples[idx] + signal = [] + with open(file_path, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('['): + try: + signal.append(float(line)) + except ValueError: continue + + # Padding ou troncature pour assurer signal_length + if len(signal) < self.signal_length: + signal.extend([0.0] * (self.signal_length - len(signal))) + else: + signal = signal[:self.signal_length] + + x = torch.tensor(signal, dtype=torch.float32).view(1, 1, -1) + return x, torch.tensor(label, dtype=torch.long) + +# --- Initialisation des données --- +data_root = "C:/DATA/M1/Stages/Fablab/data" +train_loader = DataLoader(EEGDataset(os.path.join(data_root, 'train')), + batch_size=BATCH_SIZE, + shuffle=True) +val_loader = DataLoader(EEGDataset(os.path.join(data_root, 'val')), + batch_size=BATCH_SIZE, + shuffle=False) +test_loader = DataLoader(EEGDataset(os.path.join(data_root, 'test')), + batch_size=BATCH_SIZE, + shuffle=False) + +# --- Fonction de Visualisation --- +def plot_eeg_samples(loader, n_samples=3): + samples, labels = next(iter(loader)) + classes_names = {0: 'F', 1: 'N', 2: 'O', 3: 'S', 4: 'Z'} + + plt.figure(figsize=(12, 8)) + for i in range(min(n_samples, len(samples))): + plt.subplot(n_samples, 1, i+1) + plt.plot(samples[i].view(-1).numpy()) + plt.title(f"Classe: {classes_names[labels[i].item()]}") + plt.ylabel("Amplitude") + plt.tight_layout() + plt.show() + +# Test de visualisation +plot_eeg_samples(train_loader) \ No newline at end of file diff --git a/model.py b/model.py new file mode 100644 index 0000000..619d688 --- /dev/null +++ b/model.py @@ -0,0 +1,99 @@ +# ARCHITECTURE DU MODELE EEGNET + +import torch +import torchvision +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, TensorDataset +from torchmetrics import Accuracy +import torch.utils.data as data + +#print(torch.cuda.is_available()) +# torch.cuda.get_device_name(0) + +# Not Installed: +# - Nsight for Visual Studio 2022 +# Reason: VS2022 was not found +# - Nsight for Visual Studio 2026 +# Reason: VS2026 was not found + + +### SET MODEL SPECIFICATIONS ### +train_batch_size = 8 +EPOCHS = 50 + +fs= 173 #sampling frequency +channel= 8 #number of electrode +num_input= 1 #number of channel picture (for EEG signal is always : 1) +num_class= 5 #number of classes +signal_length = 512 #number of sample in each tarial + +F1= 8 #number of temporal filters +D= 3 #depth multiplier (number of spatial filters) +F2= D*F1 #number of pointwise filters + +device= 'cpu' +kernel_size_1= (1,round(fs/2)) +kernel_size_2= (channel, 1) +kernel_size_3= (1, round(fs/8)) +kernel_size_4= (1, 1) + +kernel_avgpool_1= (1,4) +kernel_avgpool_2= (1,8) +dropout_rate= 0 + +ks0= int(round((kernel_size_1[0]-1)/2)) +ks1= int(round((kernel_size_1[1]-1)/2)) +kernel_padding_1= (ks0, ks1-1) +ks0= int(round((kernel_size_3[0]-1)/2)) +ks1= int(round((kernel_size_3[1]-1)/2)) +kernel_padding_3= (ks0, ks1) + + +### DESIGN MODEL ### +class EEGNet(nn.Module): + def __init__(self): + super().__init__() + # layer 1 + self.conv2d = nn.Conv2d(num_input, F1, kernel_size_1, padding=kernel_padding_1) + self.Batch_normalization_1 = nn.BatchNorm2d(F1) + # layer 2 + self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, kernel_size_2, groups= F1) + self.Batch_normalization_2 = nn.BatchNorm2d(D*F1) + self.elu = nn.ELU() + self.Average_pooling2D_1 = nn.AvgPool2d(kernel_avgpool_1) + self.Dropout = nn.Dropout2d(dropout_rate) + # layer 3 + self.Separable_conv2D_depth = nn.Conv2d( D*F1, D*F1, kernel_size_3, + padding=kernel_padding_3, groups= D*F1) + self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, kernel_size_4) + self.Batch_normalization_3 = nn.BatchNorm2d(F2) + self.Average_pooling2D_2 = nn.AvgPool2d(kernel_avgpool_2) + # layer 4 + self.Flatten = nn.Flatten() + self.Dense = nn.Linear( + F2*round(signal_length/34), + # 2880, + num_class) + self.Softmax = nn.Softmax(dim= 1) + + def forward(self, x): + # layer 1 + y = self.Batch_normalization_1(self.conv2d(x)) #.relu() + # layer 2 + y = self.Batch_normalization_2(self.Depthwise_conv2D(y)) + y = self.elu(y) + y = self.Dropout(self.Average_pooling2D_1(y)) + # layer 3 + y = self.Separable_conv2D_depth(y) + y = self.Batch_normalization_3(self.Separable_conv2D_point(y)) + y = self.elu(y) + y = self.Dropout(self.Average_pooling2D_2(y)) + # layer 4 + y = self.Flatten(y) + y = self.Dense(y) + y = self.Softmax(y) + + return y + +model001 = EEGNet() \ No newline at end of file diff --git a/split.py b/split.py new file mode 100644 index 0000000..5a2c058 --- /dev/null +++ b/split.py @@ -0,0 +1,8 @@ +# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4097 + +import splitfolders +import os + +data_dir = "C:/DATA/M1/Stages/Fablab/Dataset" + +split.ratio(data_dir, output="C:\DATA\M1\Stages\Fablab", seed =123, ratio=(0.8, 0.1, 0.1)) \ No newline at end of file diff --git a/splitfoldersclean.py b/splitfoldersclean.py new file mode 100644 index 0000000..33ccff5 --- /dev/null +++ b/splitfoldersclean.py @@ -0,0 +1,8 @@ +# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4096 + +import splitfolders +import os + +data_dir = r"C:/DATA/M1/Stages/Fablab/Dataset" + +splitfolders.ratio(data_dir, output=r"C:/DATA/M1/Stages/Fablab/dataclean", seed =123, ratio=(0.8, 0.1, 0.1)) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..e04cb4f --- /dev/null +++ b/test.py @@ -0,0 +1,25 @@ +from traindata import * +from model import * + +# Function to test the model +def test(): + # Load the model that we saved at the end of the training loop + model001 = EEGNet() + data_dir = "C:/DATA/M1/Stages/Fablab/dataclean" + + # path = "NetModel.pth" + model001.load_state_dict(torch.load(data_dir)) + + running_accuracy = 0 + total = 0 + + with torch.no_grad(): + for data in test_loader: + inputs, outputs = data + outputs = outputs.to(torch.float32) + predicted_outputs = model(inputs) + _, predicted = torch.max(predicted_outputs, 1) + total += outputs.size(0) + running_accuracy += (predicted == outputs).sum().item() + + print('Accuracy of the model based on the test set of', test_split ,'inputs is: %d %%' % (100 * running_accuracy / total)) \ No newline at end of file diff --git a/traindata.py b/traindata.py new file mode 100644 index 0000000..0c5729d --- /dev/null +++ b/traindata.py @@ -0,0 +1,147 @@ +### COPIE-COLLE DE TRAINDATA.PY SI LE CODE NE FONCTIONNE PAS + +# LANCER L'ENTRAINEMENT, CALCULE LA LOSS ET L'ACC ET AFFICHE LES GRAPHIQUES ASSOCIES + +from datasettransform import * +from model import * + +from torchmetrics import Accuracy +import torch.nn as nn +import matplotlib.pyplot as plt +import torch.optim as optim +import pandas as pd +import os + +device= 'cpu' + + +### TRAIN FUNCTION ### +def train_one_epoch(model, train_loader, loss_fn, optimizer): + model.train() + loss_train = AverageMeter() + acc_train = Accuracy(task="multiclass", num_classes= num_class).to(device) + + for i, (inputs, targets) in enumerate(train_loader): + inputs = inputs.to(device) + targets = targets.to(device) + + outputs = model(inputs) + loss = loss_fn(outputs, targets) + + loss.backward() + nn.utils.clip_grad_norm_(model.parameters(), 1) + optimizer.step() + optimizer.zero_grad() + + loss_train.update(loss.item()) + acc_train(outputs, targets.int()) + + return model, loss_train.avg, acc_train.compute().item() + +def validation(model, val_loader, loss_fn): + model.eval() + loss_val = AverageMeter() + acc_val = Accuracy(task="multiclass", num_classes= num_class).to(device) + + for i, (inputs, targets) in enumerate(val_loader): + inputs = inputs.to(device) + targets = targets.to(device) + + outputs = model(inputs) + loss = loss_fn(outputs, targets) + + nn.utils.clip_grad_norm_(model.parameters(), 1) + + loss_val.update(loss.item()) + acc_val(outputs, targets.int()) + + return model, loss_val.avg, acc_val.compute().item() + +# rajouter def test ici ? + + +### UTILS ### +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self): + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + +### TRAINING ### +loss_fn= nn.CrossEntropyLoss().to(device) +optimizer= optim.NAdam(model001.parameters(), lr= 0.01) + +loss_train_hist = [] +acc_train_hist = [] +loss_val_hist = [] +acc_val_hist = [] + +for epoch in range(EPOCHS): + model, loss_train, acc_train = train_one_epoch(model001, + data_loader, + loss_fn, + optimizer) + model, loss_val, acc_val = validation(model001, + data_loader, + loss_fn) + + loss_train_hist.append(loss_train) + acc_train_hist.append(acc_train) + loss_val_hist.append(loss_val) + acc_val_hist.append(acc_val) + + if (epoch%10== 5)or(epoch%10== 0): + print(f'epoch {epoch}:') + print(f' train loss= {loss_train:.4}, val loss={loss_val:.4}, train acc= {int(acc_train*100)}%, val acc= {int(acc_val*100)}% \n') + + +# Sauvegarder des resultats en csv +dataframe = pd.DataFrame({"loss_train": loss_train_hist, "acc_train": acc_train_hist, "loss_val": loss_val_hist, "acc_val": acc_val_hist}) +n = 0 +dir_export = "./resultats" +for file in os.listdir(dir_export): + if file.endswith(".csv"): + n += 1 +dataframe.to_csv(f"./resultats/model_perf_{n}.csv", sep = ',', index = False) # export du csv + +### COURBE D'APPRENTISSAGE ### +# plt.plot(range(EPOCHS), acc_train_hist, 'b-', label='Train') +# plt.xlabel('Epoch') +# plt.ylabel('Acc') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de l entraînement (train)') +# plt.show() +# # print(len(acc_train_hist)) + +# ### COURBES LOSS ### +# plt.plot(loss_train_hist, label='Train loss') +# plt.plot(loss_val_hist, label='Val loss') +# plt.xlabel('Epoch') +# plt.ylabel('Loss') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de la perte (loss)') +# plt.show() + +# ### COURBES ACC ### +# plt.plot(acc_train_hist, label='Train acc') +# plt.plot(acc_val_hist, label='Val acc') +# plt.xlabel('Epoch') +# plt.ylabel('Acc') +# plt.grid(True) +# plt.legend() +# plt.title('Evolution de la précision (acc)') +# plt.show() \ No newline at end of file