Skip to content
Snippets Groups Projects
Commit ddafeb13 authored by Samira Goudarzi's avatar Samira Goudarzi
Browse files

Upload New File

parent 17d88437
No related branches found
No related tags found
No related merge requests found
import numpy as np
import os
from transformers import (BertTokenizerFast,BertForTokenClassification)
import random
# from data_utils import read_txt_file
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
import torch
from torch.utils.data import DataLoader
from transformers import AdamW
#Study on the first 200_000 sentences+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
model_dir = "X:/zzz_Samira/Pre-trained Model/bert-base-swedish-cased"
# Load BERT tokenizer
tokenizer = BertTokenizerFast.from_pretrained(
model_dir,
do_lower_case=False,
eos_token="[EOS]",
bos_token="[BOS]",
local_files_only=True,#load from the local directory only not try to download anything from the internet.
)
model = BertForTokenClassification.from_pretrained(
model_dir,
num_labels=2, # Specify the number of labels in your classification task
output_hidden_states=True, # If you need the hidden states for any reason
local_files_only=True
)
if not os.path.isfile("NER_Journal_Data_only_implant_sentences.npy"):
# read the sentence file
corpus_file = "../4clinics_fixed_sentences.txt"
with open(corpus_file, 'r', encoding='utf-8') as file:
sentences = [line.strip() for line in file if line.strip()]#filtering out empty lines or lines that contain only whitespace characters
sentences = sentences[0:200_000]
# read the glossary file# You can change the glassary+++++++++++++++++++++++++++++++++++++++++++++++
implant_glossary = "X:/29_Aditya_Oskar_NER/Glossary/output_combined_glossary.txt"
with open(implant_glossary, 'r', encoding='utf-8') as file:
implant_terms = [line.strip() for line in file if line.strip()]#filtering out empty lines or lines that contain only whitespace characters
print("Number of Implant Terms are : ", len(implant_terms))
## generate a dictionary of all the glossary terms,Should include how many times the implant terms appears and the sentences++++++++++++++++++++++++++++++++++++++++++
data_dict = {}
count = 0
sentences_cont = []
sentences_with_implants = {}
for term in tqdm(implant_terms, desc="Generating dictionary of sentences with implants"):
for itw, sent in enumerate(sentences):
if term.lower() in [part for part in sent.lower().split()]:
count += 1
if sent not in sentences_cont:
sentences_cont.append(sent)
sentences_with_implants[itw] = sent
data_dict[term] = [count, sentences_cont]
count = 0
sentences_cont = []
print(f"As an example the last implant term, '{term}' appears {data_dict[term][0]} times in the list of sentences.")
# print(f"Each implant term and number of apprearances with corresponding sentences:\n{(data_dict)}")
# print(f"Each implant terms and number of apprearance with corresponding sentences:\n{(sentences_with_implants)}")
# print(f"The number of sentences containig Implant terms is {len(sentences_with_implants)}") # this is not a correct description
# sentences_with_implants
## A dictionary of Implant terms that you find in the sentences (non_zero occurence) and corresponding sentences+++++++++++++++++++++++++++
data_dict_non_zero = {}
for key , value in data_dict.items():
if value[0]!=0:
data_dict_non_zero[key] = value
print(f"You can find {len(data_dict_non_zero)} of the Implant terms in sentences:\n {data_dict_non_zero}")
# ## Indices of sentences with and without the implant terms+++++++++++++++++++++++++++++++++++++++++++++++++++
idx_list = np.arange(len(sentences)).tolist()
idxs_sent_with_implants = list(sentences_with_implants.keys())
idxs_sent_without_implants = (list(set(idx_list)-set(idxs_sent_with_implants)))
def tokenize_sentence_and_assign_labels(
example: str, label_all_tokens_same: bool = True, other_label: int = 2
):
"""
example : is a sentence
label_all_tokens_same : default is True. If True, this parameter assigns the same label to all the
tokens that belong to the same word. If False, only the first token is assigned a label '1' and
rest all other tokens of the same word get assigned a other_label e.g. 2 or -100.
other_label : default is 2. Label assigned to all except the first token of a word.
"""
example_text = example.split()
## tokenize the sentence
tokenized_input = tokenizer(
example_text, ## Here, tokens are the sentences split into words/word pieces
is_split_into_words=True,
#is_split_into_words : Whether or not the input is already pre-tokenized (e.g., split into words). If set to `True`, the
# tokenizer assumes the input is already split into words (for instance, by splitting it on whitespace) which it will tokenize.
# This is useful for NER or token classification.
truncation=True,padding='max_length', max_length = 100 ,return_tensors='pt'
)
# generate the word ids
word_ids = tokenized_input.word_ids()
match_word_indices = []
label_ids = np.zeros((len(word_ids)), dtype=np.int8).tolist()
for term in implant_terms:
# get list of indices of the word positions of the matching words
for t, word in enumerate(example_text):
if (
term.lower() == word.lower() and t not in match_word_indices#??????????????????????????????????????????????????????????
): ## matching the term exactly with the word
match_word_indices.append(t)
previous_id = None
for id, wid in enumerate(word_ids):
if wid is None:
label_ids[id] = -100
elif wid in match_word_indices:
if wid != previous_id:
label_ids[id] = 1
elif wid == previous_id:
if label_all_tokens_same:
label_ids[id] = 1
else:
label_ids[id] = other_label
else:
pass
previous_id = wid
tokenized_input["labels"] = label_ids
return (tokenized_input)
##########################################################################
# Initialize JournalNERDataset as an empty list
JournalNERDataset = []
# Loop through a subset of sentence indices that have implants
for sent_id in tqdm(idxs_sent_with_implants):
example_text = sentences[sent_id]
modified_output = tokenize_sentence_and_assign_labels(
example_text, label_all_tokens_same=True, other_label=2
)
# Append the modified output directly to the list
JournalNERDataset.append({
'input_ids': modified_output['input_ids'].tolist(), # Convert tensors to lists
'token_type_ids': modified_output['token_type_ids'].tolist(),
'attention_mask': modified_output['attention_mask'].tolist(),
'labels':modified_output['labels']
})
# Save the journal NER data
np.save("NER_Journal_Data_only_implant_sentences.npy", JournalNERDataset)
# Loading the saved NER data
JournalNERDataset_loaded = np.load("NER_Journal_Data_only_implant_sentences.npy", allow_pickle=True)
JournalNERDataset_loaded = JournalNERDataset_loaded.tolist() # Convert from numpy array to list if necessary
# # Hold_out dataset: 80% data is splited as training data , 10 % as test data amd 10% as validation data++++++++++++++++++++++++++++++++++++++
SEED = 1234567
torch.random.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
n = len(JournalNERDataset_loaded)
train_size = round(80*n/100)
valid_size = round(10*n/100)
train_indx = random.sample(range(n),train_size)
remain_indx = set(range(n)) - set(train_indx)
valid_indx = random.sample(remain_indx,valid_size)
test_indx = set(remain_indx) - set(valid_indx)
test_indx = list(test_indx)
# # Create dictionaries to hold the training data,valid data and test data++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
training_data = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'labels': []}
for index in train_indx:
entry = JournalNERDataset_loaded[index]
training_data['input_ids'].append(entry['input_ids'][0])
training_data['token_type_ids'].append(entry['token_type_ids'][0])
training_data['attention_mask'].append(entry['attention_mask'][0])
training_data['labels'].append(entry['labels'])
valid_data = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'labels': []}
for index in valid_indx:
entry = JournalNERDataset_loaded[index]
valid_data['input_ids'].append(entry['input_ids'][0])
valid_data['token_type_ids'].append(entry['token_type_ids'][0])
valid_data['attention_mask'].append(entry['attention_mask'][0])
valid_data['labels'].append(entry['labels'])
test_data = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'labels': []}
for index in test_indx:
entry = JournalNERDataset_loaded[index]
test_data['input_ids'].append(entry['input_ids'][0])
test_data['token_type_ids'].append(entry['token_type_ids'][0])
test_data['attention_mask'].append(entry['attention_mask'][0])
test_data['labels'].append(entry['labels'])
## Fine_tuning++++++++++++++++++++++++++++++++++++++
class MedicalrecordsDataset(torch.utils.data.Dataset):
def __init__(self, encodings):
self.encodings = encodings
def __getitem__(self, idx):
return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
def __len__(self):
# This should match the key in the input dictionary
return len(self.encodings['input_ids'])
## setup GPU/CPU usage++++++++++++++++++++++++++++++
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
# test data accuracy ##################################################
test_dataset = MedicalrecordsDataset(test_data)
test_loader = DataLoader(dataset=test_dataset, shuffle=False, batch_size=16)
print("Size of test dataset:", len(test_dataset))
type(test_dataset[-1])# test_dataset contains the number of:"len(test_dataset)" of dictionaries.
test_dataset[1]['labels']
# valid_dataset[1].keys()
model.eval() # Set the model to evaluation mode
#batch = test_dataset[:]
pred = []
true_label = []
for batch in test_loader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
# Flatten the output for processing
batch_labels = batch['labels']
true_label.append(batch_labels.flatten())
pred.append(predictions.flatten())
true_label = torch.concat(true_label)
pred = torch.concat(pred)
# Filter out `-100` from the true labels and corresponding predictions. -100 is considered for CLS,SEP and padding.
test_indices = true_label != -100
test_labels = true_label[test_indices].cpu().numpy()
test_predictions = pred[test_indices].cpu().numpy()
print((test_predictions == test_labels).sum() -len(test_labels))
# Calculate performance metrics
print("confusion_matrix\n",confusion_matrix(test_labels, test_predictions))
tn, fp, fn, tp = confusion_matrix(test_labels, test_predictions).ravel()
print(tn, fp, fn, tp)
print("classification_report",classification_report(test_labels, test_predictions))
# Fine Tuning++++++++++++++++++++++++++++++
# Assuming 'train_dataset' is a PyTorch Dataset object loaded with your training data
train_dataset = MedicalrecordsDataset(training_data)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_dataset = MedicalrecordsDataset(valid_data)
val_loader = DataLoader(dataset=valid_dataset, shuffle=False, batch_size=16)
print("Size of train dataset:", len(train_dataset))
print("Size of valid dataset:", len(valid_dataset))
epoch_losses_train = []
epoch_losses_val = []
epochs = 10
optimizer = AdamW(model.parameters(), lr=5e-5)
# Training loop
for epoch in range(epochs):
model.train()
temp_loss_train = []# Temporary list for each epoch's losses
temp_loss_val = []# Temporary list for each epoch's losses
loop = tqdm(train_loader, leave=True)
for batch_train in loop:
model.train()
batch_train = {k: v.to(device) for k, v in batch_train.items()}
outputs = model(**batch_train)
# Compute the loss between outputs and labels
loss = outputs.loss if outputs.loss is not None else torch.nn.functional.cross_entropy(outputs.logits, batch_train['labels'])
# Backpropagation
loss.backward() # Compute gradients
optimizer.step() # Update weights
optimizer.zero_grad() # Clear gradients
temp_loss_train.append(loss.item())
# eval validation ################################################
model.eval()
for batch_val in val_loader:
batch_val = {k: v.to(device) for k, v in batch_val.items()}
outputs = model(**batch_val)
loss = outputs.loss if outputs.loss is not None else torch.nn.functional.cross_entropy(outputs.logits, batch_val['labels'])
temp_loss_val.append(loss.item())
# Calculate the average loss for the epoch and append to epoch_losses
epoch_avg_loss = sum(temp_loss_train) / len(temp_loss_train)
epoch_losses_train.append(epoch_avg_loss)
epoch_avg_loss = sum(temp_loss_val) / len(temp_loss_val)
epoch_losses_val.append(epoch_avg_loss)
# After training, you can save your model
model.save_pretrained('./finedtuned_model_no_class_weight')
# # Training Loss, class weights are not applied+++++++++++++++++++++++++++++++++++++++++++++++++
# Assuming epoch_losses contains the average loss per epoch
plt.figure(figsize=(10, 5))
plt.plot(range(1, epochs + 1), epoch_losses_train, label='Training Loss', marker='o') # epochs range as x-axis
plt.plot(range(1, epochs + 1), epoch_losses_val, label='Validation Loss', marker='x') # epochs range as x-axis
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig("losses.png")
#plt.show()
# validation accuracy ###################################################
type(valid_dataset[9])
valid_dataset[1]['labels']
# valid_dataset[1].keys()
model.eval() # Set the model to evaluation mode
pred = []
true_label = []
for batch in val_loader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
# Flatten the output for processing
batch_labels = batch['labels']
true_label.append(batch_labels.flatten())
pred.append(predictions.flatten())
# Filter out `-100` from the true labels and corresponding predictions
true_label = torch.concat(true_label)
pred = torch.concat(pred)
# Filter out `-100` from the true labels and corresponding predictions
valid_indices = true_label != -100
valid_labels = true_label[valid_indices].cpu().numpy()
valid_predictions = pred[valid_indices].cpu().numpy()
print((valid_predictions == valid_labels).sum() -len(valid_labels))
# Calculate performance metrics
print("confusion_matrix\n",confusion_matrix(valid_labels, valid_predictions))
tn, fp, fn, tp = confusion_matrix(valid_labels, valid_predictions).ravel()
print(tn, fp, fn, tp)
# classification_report from Scikit-Learn to generate a detailed performance report.
print("classification_report",classification_report(valid_labels, valid_predictions))
# test data accuracy ##################################################
pred = []
true_label = []
test_dataset = MedicalrecordsDataset(test_data)
type(test_dataset[-1])# train_dataset contains the number of:"len(valid_dataset)" of dictionaries.
test_dataset[1]['labels']
# valid_dataset[1].keys()
model.eval() # Set the model to evaluation mode
# batch = test_dataset[:]
for batch in test_loader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
# Flatten the output for processing
batch_labels = batch['labels']
true_label.append(batch_labels.flatten())
pred.append(predictions.flatten())
true_label = torch.concat(true_label)
pred = torch.concat(pred)
# Filter out `-100` from the true labels and corresponding predictions
test_indices = true_label != -100
test_labels = true_label[test_indices].cpu().numpy()
test_predictions = pred[test_indices].cpu().numpy()
print((test_predictions == test_labels).sum() -len(test_labels))
# Calculate performance metrics
print("confusion_matrix\n",confusion_matrix(test_labels, test_predictions))
tn, fp, fn, tp = confusion_matrix(test_labels, test_predictions).ravel()
print(tn, fp, fn, tp)
# classification_report from Scikit-Learn to generate a detailed performance report.
print("classification_report",classification_report(test_labels, test_predictions))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment