First commit

This commit is contained in:
arthurttz 2026-05-13 11:39:59 +02:00
commit 8e99dae59d
8 changed files with 457 additions and 0 deletions

43
datasettransform.py Normal file
View file

@ -0,0 +1,43 @@
# TRANSFORME LES FICHIERS .TXT EN MATRICE 8x512, CE SCRIPT EST UTILISE DANS TRAINDATA.PY
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import os
train_batch_size = 16
data_dir = "C:/DATA/M1/Stages/Fablab/Dataset"
LST2 = []
label = []
for ID, sub_dir in enumerate(os.listdir(data_dir)):
for file in os.listdir(os.path.join(data_dir, sub_dir)):
label.append(ID)
f = open(os.path.join(data_dir, sub_dir, file), 'r+')
lst_f = []
for i in f:
i = i.replace('\n', '')
lst_f.append(int(i))
# matrice = np.array(lst_f).reshape(8, 512)
# LST_2D.append(matrice)
matrice = np.array(lst_f[:4096]).reshape(8, 512)
LST2.append(matrice)
# X devient (nombre de fichiers, channel, listes, données), avec 1 pour le canal d'entrée requis par EEGNet
X = torch.Tensor(np.array(LST2)).unsqueeze(1)
Y = torch.LongTensor(label)
# créer le DataLoader
dataset = TensorDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=train_batch_size, shuffle=True)
print(f' Longueur de la liste: {len(LST2)}')
print(f' Nombre de fichiers: {len(label)}')
print(f' Forme de la liste des tensor: {X.shape}')
print("train batch size:",data_loader.batch_size,
", num of batch:", len(data_loader))

3
graph.py Normal file
View file

@ -0,0 +1,3 @@
from traindata import loss_train_hist, acc_train_hist, loss_val_hist = []
acc_val_hist = []

124
inspi.py Normal file
View file

@ -0,0 +1,124 @@
# CONTIENT UNE VERSION DU MODELE ET UN DATASET PERSONNALISE,
# JE PEUX M'EN INSPIRER POUR NETTOYER DATASETTRANSFORM.PY ET LIRE LES FICHIERS
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
from torchmetrics import Accuracy
import os
# --- Paramètres globaux ---
BATCH_SIZE = 8
EPOCHS = 50
fs = 173
channel = 1
num_input = 1
num_class = 5
signal_length = 4097
device = 'cpu'
# --- Architecture EEGNet (inchangée) ---
F1, D = 8, 3
F2 = D * F1
class EEGNet(nn.Module):
def __init__(self):
super().__init__()
self.conv2d = nn.Conv2d(num_input, F1, (1, round(fs/2)), padding=(0, round(fs/4)-1))
self.Batch_normalization_1 = nn.BatchNorm2d(F1)
self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, (channel, 1), groups=F1)
self.Batch_normalization_2 = nn.BatchNorm2d(D*F1)
self.Elu = nn.ELU()
self.Average_pooling2D_1 = nn.AvgPool2d((1, 4))
self.Dropout = nn.Dropout2d(0.2)
self.Separable_conv2D_depth = nn.Conv2d(D*F1, D*F1, (1, round(fs/8)), padding=(0, round(fs/16)), groups=D*F1)
self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, (1, 1))
self.Batch_normalization_3 = nn.BatchNorm2d(F2)
self.Average_pooling2D_2 = nn.AvgPool2d((1, 8))
self.Flatten = nn.Flatten()
# Calcul dynamique de la taille de sortie pour la couche Dense
self.Dense = nn.Linear(F2 * (signal_length // 32), num_class)
self.Softmax = nn.Softmax(dim=1)
def forward(self, x):
y = self.Batch_normalization_1(self.conv2d(x))
y = self.Batch_normalization_2(self.Depthwise_conv2D(y))
y = self.Elu(y)
y = self.Dropout(self.Average_pooling2D_1(y))
y = self.Separable_conv2D_depth(y)
y = self.Batch_normalization_3(self.Separable_conv2D_point(y))
y = self.Elu(y)
y = self.Dropout(self.Average_pooling2D_2(y))
y = self.Flatten(y)
return self.Softmax(self.Dense(y))
# --- Dataset adapté à ton architecture de dossiers ---
class EEGDataset(torch.utils.data.Dataset):
def __init__(self, root_path, signal_length=4097):
self.root_path = root_path
self.signal_length = signal_length
self.class_map = {'F': 0, 'N': 1, 'O': 2, 'S': 3, 'Z': 4}
self.samples = []
# Parcourt récursivement root_path (ex: data/train/) pour trouver les .txt dans F, N, O, S, Z
for class_name in self.class_map.keys():
class_folder = os.path.join(root_path, class_name)
if os.path.exists(class_folder):
for f in os.listdir(class_folder):
if f.lower().endswith('.txt'):
self.samples.append((os.path.join(class_folder, f), self.class_map[class_name]))
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
file_path, label = self.samples[idx]
signal = []
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('['):
try:
signal.append(float(line))
except ValueError: continue
# Padding ou troncature pour assurer signal_length
if len(signal) < self.signal_length:
signal.extend([0.0] * (self.signal_length - len(signal)))
else:
signal = signal[:self.signal_length]
x = torch.tensor(signal, dtype=torch.float32).view(1, 1, -1)
return x, torch.tensor(label, dtype=torch.long)
# --- Initialisation des données ---
data_root = "C:/DATA/M1/Stages/Fablab/data"
train_loader = DataLoader(EEGDataset(os.path.join(data_root, 'train')),
batch_size=BATCH_SIZE,
shuffle=True)
val_loader = DataLoader(EEGDataset(os.path.join(data_root, 'val')),
batch_size=BATCH_SIZE,
shuffle=False)
test_loader = DataLoader(EEGDataset(os.path.join(data_root, 'test')),
batch_size=BATCH_SIZE,
shuffle=False)
# --- Fonction de Visualisation ---
def plot_eeg_samples(loader, n_samples=3):
samples, labels = next(iter(loader))
classes_names = {0: 'F', 1: 'N', 2: 'O', 3: 'S', 4: 'Z'}
plt.figure(figsize=(12, 8))
for i in range(min(n_samples, len(samples))):
plt.subplot(n_samples, 1, i+1)
plt.plot(samples[i].view(-1).numpy())
plt.title(f"Classe: {classes_names[labels[i].item()]}")
plt.ylabel("Amplitude")
plt.tight_layout()
plt.show()
# Test de visualisation
plot_eeg_samples(train_loader)

99
model.py Normal file
View file

@ -0,0 +1,99 @@
# ARCHITECTURE DU MODELE EEGNET
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchmetrics import Accuracy
import torch.utils.data as data
#print(torch.cuda.is_available())
# torch.cuda.get_device_name(0)
# Not Installed:
# - Nsight for Visual Studio 2022
# Reason: VS2022 was not found
# - Nsight for Visual Studio 2026
# Reason: VS2026 was not found
### SET MODEL SPECIFICATIONS ###
train_batch_size = 8
EPOCHS = 50
fs= 173 #sampling frequency
channel= 8 #number of electrode
num_input= 1 #number of channel picture (for EEG signal is always : 1)
num_class= 5 #number of classes
signal_length = 512 #number of sample in each tarial
F1= 8 #number of temporal filters
D= 3 #depth multiplier (number of spatial filters)
F2= D*F1 #number of pointwise filters
device= 'cpu'
kernel_size_1= (1,round(fs/2))
kernel_size_2= (channel, 1)
kernel_size_3= (1, round(fs/8))
kernel_size_4= (1, 1)
kernel_avgpool_1= (1,4)
kernel_avgpool_2= (1,8)
dropout_rate= 0
ks0= int(round((kernel_size_1[0]-1)/2))
ks1= int(round((kernel_size_1[1]-1)/2))
kernel_padding_1= (ks0, ks1-1)
ks0= int(round((kernel_size_3[0]-1)/2))
ks1= int(round((kernel_size_3[1]-1)/2))
kernel_padding_3= (ks0, ks1)
### DESIGN MODEL ###
class EEGNet(nn.Module):
def __init__(self):
super().__init__()
# layer 1
self.conv2d = nn.Conv2d(num_input, F1, kernel_size_1, padding=kernel_padding_1)
self.Batch_normalization_1 = nn.BatchNorm2d(F1)
# layer 2
self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, kernel_size_2, groups= F1)
self.Batch_normalization_2 = nn.BatchNorm2d(D*F1)
self.elu = nn.ELU()
self.Average_pooling2D_1 = nn.AvgPool2d(kernel_avgpool_1)
self.Dropout = nn.Dropout2d(dropout_rate)
# layer 3
self.Separable_conv2D_depth = nn.Conv2d( D*F1, D*F1, kernel_size_3,
padding=kernel_padding_3, groups= D*F1)
self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, kernel_size_4)
self.Batch_normalization_3 = nn.BatchNorm2d(F2)
self.Average_pooling2D_2 = nn.AvgPool2d(kernel_avgpool_2)
# layer 4
self.Flatten = nn.Flatten()
self.Dense = nn.Linear(
F2*round(signal_length/34),
# 2880,
num_class)
self.Softmax = nn.Softmax(dim= 1)
def forward(self, x):
# layer 1
y = self.Batch_normalization_1(self.conv2d(x)) #.relu()
# layer 2
y = self.Batch_normalization_2(self.Depthwise_conv2D(y))
y = self.elu(y)
y = self.Dropout(self.Average_pooling2D_1(y))
# layer 3
y = self.Separable_conv2D_depth(y)
y = self.Batch_normalization_3(self.Separable_conv2D_point(y))
y = self.elu(y)
y = self.Dropout(self.Average_pooling2D_2(y))
# layer 4
y = self.Flatten(y)
y = self.Dense(y)
y = self.Softmax(y)
return y
model001 = EEGNet()

8
split.py Normal file
View file

@ -0,0 +1,8 @@
# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4097
import splitfolders
import os
data_dir = "C:/DATA/M1/Stages/Fablab/Dataset"
split.ratio(data_dir, output="C:\DATA\M1\Stages\Fablab", seed =123, ratio=(0.8, 0.1, 0.1))

8
splitfoldersclean.py Normal file
View file

@ -0,0 +1,8 @@
# PREPARE LES DOSSIERS EN SEPARANT LES DONNEES 4096
import splitfolders
import os
data_dir = r"C:/DATA/M1/Stages/Fablab/Dataset"
splitfolders.ratio(data_dir, output=r"C:/DATA/M1/Stages/Fablab/dataclean", seed =123, ratio=(0.8, 0.1, 0.1))

25
test.py Normal file
View file

@ -0,0 +1,25 @@
from traindata import *
from model import *
# Function to test the model
def test():
# Load the model that we saved at the end of the training loop
model001 = EEGNet()
data_dir = "C:/DATA/M1/Stages/Fablab/dataclean"
# path = "NetModel.pth"
model001.load_state_dict(torch.load(data_dir))
running_accuracy = 0
total = 0
with torch.no_grad():
for data in test_loader:
inputs, outputs = data
outputs = outputs.to(torch.float32)
predicted_outputs = model(inputs)
_, predicted = torch.max(predicted_outputs, 1)
total += outputs.size(0)
running_accuracy += (predicted == outputs).sum().item()
print('Accuracy of the model based on the test set of', test_split ,'inputs is: %d %%' % (100 * running_accuracy / total))

147
traindata.py Normal file
View file

@ -0,0 +1,147 @@
### COPIE-COLLE DE TRAINDATA.PY SI LE CODE NE FONCTIONNE PAS
# LANCER L'ENTRAINEMENT, CALCULE LA LOSS ET L'ACC ET AFFICHE LES GRAPHIQUES ASSOCIES
from datasettransform import *
from model import *
from torchmetrics import Accuracy
import torch.nn as nn
import matplotlib.pyplot as plt
import torch.optim as optim
import pandas as pd
import os
device= 'cpu'
### TRAIN FUNCTION ###
def train_one_epoch(model, train_loader, loss_fn, optimizer):
model.train()
loss_train = AverageMeter()
acc_train = Accuracy(task="multiclass", num_classes= num_class).to(device)
for i, (inputs, targets) in enumerate(train_loader):
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_fn(outputs, targets)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1)
optimizer.step()
optimizer.zero_grad()
loss_train.update(loss.item())
acc_train(outputs, targets.int())
return model, loss_train.avg, acc_train.compute().item()
def validation(model, val_loader, loss_fn):
model.eval()
loss_val = AverageMeter()
acc_val = Accuracy(task="multiclass", num_classes= num_class).to(device)
for i, (inputs, targets) in enumerate(val_loader):
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_fn(outputs, targets)
nn.utils.clip_grad_norm_(model.parameters(), 1)
loss_val.update(loss.item())
acc_val(outputs, targets.int())
return model, loss_val.avg, acc_val.compute().item()
# rajouter def test ici ?
### UTILS ###
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
### TRAINING ###
loss_fn= nn.CrossEntropyLoss().to(device)
optimizer= optim.NAdam(model001.parameters(), lr= 0.01)
loss_train_hist = []
acc_train_hist = []
loss_val_hist = []
acc_val_hist = []
for epoch in range(EPOCHS):
model, loss_train, acc_train = train_one_epoch(model001,
data_loader,
loss_fn,
optimizer)
model, loss_val, acc_val = validation(model001,
data_loader,
loss_fn)
loss_train_hist.append(loss_train)
acc_train_hist.append(acc_train)
loss_val_hist.append(loss_val)
acc_val_hist.append(acc_val)
if (epoch%10== 5)or(epoch%10== 0):
print(f'epoch {epoch}:')
print(f' train loss= {loss_train:.4}, val loss={loss_val:.4}, train acc= {int(acc_train*100)}%, val acc= {int(acc_val*100)}% \n')
# Sauvegarder des resultats en csv
dataframe = pd.DataFrame({"loss_train": loss_train_hist, "acc_train": acc_train_hist, "loss_val": loss_val_hist, "acc_val": acc_val_hist})
n = 0
dir_export = "./resultats"
for file in os.listdir(dir_export):
if file.endswith(".csv"):
n += 1
dataframe.to_csv(f"./resultats/model_perf_{n}.csv", sep = ',', index = False) # export du csv
### COURBE D'APPRENTISSAGE ###
# plt.plot(range(EPOCHS), acc_train_hist, 'b-', label='Train')
# plt.xlabel('Epoch')
# plt.ylabel('Acc')
# plt.grid(True)
# plt.legend()
# plt.title('Evolution de l entraînement (train)')
# plt.show()
# # print(len(acc_train_hist))
# ### COURBES LOSS ###
# plt.plot(loss_train_hist, label='Train loss')
# plt.plot(loss_val_hist, label='Val loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.grid(True)
# plt.legend()
# plt.title('Evolution de la perte (loss)')
# plt.show()
# ### COURBES ACC ###
# plt.plot(acc_train_hist, label='Train acc')
# plt.plot(acc_val_hist, label='Val acc')
# plt.xlabel('Epoch')
# plt.ylabel('Acc')
# plt.grid(True)
# plt.legend()
# plt.title('Evolution de la précision (acc)')
# plt.show()