diff --git a/methods/__init__.py b/methods/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/methods/model.py b/methods/model.py new file mode 100644 index 0000000..9c22e46 --- /dev/null +++ b/methods/model.py @@ -0,0 +1,51 @@ +import torch.nn as nn +import torch.nn.functional as F +import torch +from transformers import BertModel +import numpy as np + + +class SentimentAspectCNN(nn.Module): + def __init__(self, embedding_dim, num_filters, filter_sizes, output_dim, dropout): + super().__init__() + self.convs = nn.ModuleList([ + nn.Conv2d(in_channels=1, out_channels=num_filters, kernel_size=(fs, embedding_dim)) + for fs in filter_sizes + ]) + self.fc = nn.Linear(len(filter_sizes) * num_filters, output_dim) + self.dropout = nn.Dropout(dropout) + self.sigmoid = nn.Sigmoid() + + def forward(self, x): + # x shape: (batch_size, max_length, embedding_dim + 1) + x = x.unsqueeze(1) # Add a channel dimension, x shape: (batch_size, 1, max_length, embedding_dim + 1) + + # Apply convolution and ReLU activation + x = [nn.functional.relu(conv(x)).squeeze(3) for conv in self.convs] # List of tensors of shape (batch_size, num_filters, max_length - filter_size + 1) + + # Apply max pooling + x = [nn.functional.max_pool1d(tensor, tensor.size(2)).squeeze(2) for tensor in x] # List of tensors of shape (batch_size, num_filters) + + # Concatenate the pooling results + x = torch.cat(x, dim=1) # Shape: (batch_size, len(filter_sizes) * num_filters) + + # Apply dropout + x = self.dropout(x) + + # Fully connected layer + x = self.fc(x) # Shape: (batch_size, output_dim) + + # Sigmoid activation to get a score between 0 and 1 + x = self.sigmoid(x) + + return x + +if __name__ == "__main__": + embedding_dim = 26 # 25 for word embeddings + 1 for aspect indicator + num_filters = 100 + filter_sizes = [3, 4, 5] + output_dim = 1 + dropout = 0.5 + + model = SentimentAspectCNN(embedding_dim, num_filters, filter_sizes, output_dim, dropout) + print(model) diff --git a/methods/tokenizer.py b/methods/tokenizer.py new file mode 100644 index 0000000..1a7161b --- /dev/null +++ b/methods/tokenizer.py @@ -0,0 +1,83 @@ +from sawaw import SAWAWEntry, SentimentResult +import torch +import gensim +import gensim.downloader +from loguru import logger +from typing import Optional + +class GensimTokenizer: + + def __init__(self): + glove_vectors = gensim.downloader.load('glove-twitter-25') + self.gensim_model: gensim.models.keyedvectors.KeyedVectors = glove_vectors + + def word2vec(self, word: str, as_torch_tensor: bool = False, zero_if_not_found: bool = True): + ret = self.gensim_model[word] if word in self.gensim_model else [0] * 25 if zero_if_not_found else None + if as_torch_tensor: + ret = torch.tensor(ret) + return ret + + def sentence2vec(self, sentence: str, pad_to_len: Optional[int]=None) -> torch.Tensor: + list_of_words = gensim.utils.simple_preprocess(sentence) + vec_result = torch.stack([self.word2vec(word, as_torch_tensor=True, zero_if_not_found=True) for word in list_of_words]) # shape: (num_of_words, 25) + if pad_to_len is not None and vec_result.shape[0] < pad_to_len: + vec_result = torch.cat([vec_result, torch.zeros((pad_to_len - vec_result.shape[0], 25))]) + elif pad_to_len is not None and vec_result.shape[0] > pad_to_len: + vec_result = vec_result[:pad_to_len] + logger.warning("Dropping words after '{}' from sentence '{}'", list_of_words[pad_to_len], sentence) + return vec_result + + +def _to_vec(sentence: str, aspect_word: str, tokenizer: GensimTokenizer, max_len: int = 25) -> torch.Tensor: + # Tokenize and convert sentence to vectors + sentence_vec = tokenizer.sentence2vec(sentence, pad_to_len=max_len) # shape: (max_len, 25) + + # Preprocess and tokenize aspect words + aspect_word = gensim.utils.simple_preprocess(aspect_word) + aspect_indicators = torch.zeros(max_len, 1) + + # Iterate over the sentence and mark aspect words + for i, word in enumerate(gensim.utils.simple_preprocess(sentence)): + if i >= max_len: + break + if word in aspect_word: + aspect_indicators[i] = 1 + + # Concatenate the sentence vectors with the aspect indicators + combined_vec = torch.cat((sentence_vec, aspect_indicators), dim=1) # shape: (max_len, 26) + return combined_vec + + +gt = GensimTokenizer() + +def to_vec(entry: SAWAWEntry, max_len: int = 80, should_return_sentiment: bool=True) -> torch.Tensor: + aspect_word_encoded_sentence = [] + sentiment_result = [] + for i, aspect_word in enumerate(entry.aspect_words): + vec = _to_vec(entry.comment, aspect_word, gt, max_len=max_len) # shape: (max_len, 26) + aspect_word_encoded_sentence.append(vec) + if should_return_sentiment: + sent = entry.sentiment_results[i] + if sent == SentimentResult.UNDEFINED: + logger.warning("Sentiment result for aspect word '{}' is undefined, but to_vec is called with should_return_sentiment=True. Assuming neutral.", aspect_word) + sentiment_result = 0.5 + elif sent == SentimentResult.NEGATIVE: + sentiment_result = 0 + elif sent == SentimentResult.POSITIVE: + sentiment_result = 1 + elif sent == SentimentResult.NEUTRAL or sent == SentimentResult.NONE: + sentiment_result = 0.5 + + if should_return_sentiment: + return torch.stack(aspect_word_encoded_sentence), torch.tensor(sentiment_result) + else: + return torch.stack(aspect_word_encoded_sentence) # shape: (num_of_aspect_words, max_len, 26) + + +if __name__ == '__main__': + tokenizer = GensimTokenizer() + sentence = "The pizza at this restaurant is amazing, but the service is slow." + aspect_words = "service" + vectorized_sentence = to_vec(sentence, aspect_words, tokenizer) + vectorized_sentence.shape # should be (25, 26) + diff --git a/pyproject.toml b/pyproject.toml index 7cfdb46..496401a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,3 +19,6 @@ pytest = "^7.4.0" pytest-cov = "^4.1.0" openai = "^1.2.0,<1.3.0" colorama = "^0.4.4" +gensim = "^4.3.2" +tqdm = "^4.62.3" +torch = "^1.13.1,<2.0.0" diff --git a/scripts/train_cnn.py b/scripts/train_cnn.py new file mode 100644 index 0000000..f459d2c --- /dev/null +++ b/scripts/train_cnn.py @@ -0,0 +1,92 @@ +import numpy as np +from sawaw import SAWAWEntry, SentimentResult +from pathlib import Path +import torch +from loguru import logger +from tqdm import tqdm + +from methods.tokenizer import to_vec +from methods.model import SentimentAspectCNN +# Load the data from semeval dataset +path = Path("./data/restaurant_train.raw") +content = path.read_text() + +def parse_content(content: str): + '''I 'm partial to the $T$ . + Gnocchi + 1''' + lines = content.split("\n") + entries = [] + for i in range(0, len(lines), 3): + if i + 2 >= len(lines): + break + sentence, aspect_word, sentiment = lines[i], lines[i+1], lines[i+2] + sentence_replaced = sentence.replace("$T$", aspect_word) + entries.append(SAWAWEntry(sentence_replaced, [aspect_word], [SentimentResult(int(sentiment)+1)])) + return entries + +entries = parse_content(content) +logger.info("Loaded {} entries from {}", len(entries), path) + +# Load the tokenizer +max_len = 80 +data_vectors, sentiment_gts = [], [] +for entry in tqdm(entries): + data_vector, sentiment_gt = to_vec(entry, max_len=max_len, should_return_sentiment=True) # shape: (num_of_aspect_words, 80, 26); (num_of_aspect_words, ) + data_vectors.append(data_vector) + sentiment_gts.append(sentiment_gt) + +data_vectors = torch.cat(data_vectors, dim=0) +sentiment_gts = torch.Tensor(sentiment_gts).unsqueeze(1) # shape: (num_of_aspect_words, 1) + +# Train the model +embedding_dim = 26 # 25 for word embeddings + 1 for aspect indicator +num_filters = 88 +filter_sizes = [3, 4, 3] +output_dim = 1 +dropout = 0.2 + +model = SentimentAspectCNN(embedding_dim, num_filters, filter_sizes, output_dim, dropout) +model.train() +optimizer = torch.optim.Adam(model.parameters()) +criterion = torch.nn.BCELoss() + +batch_size = 16 +from torch.utils.data import TensorDataset, DataLoader +dataset = TensorDataset(data_vectors, sentiment_gts) +dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) + +try: + epochs = 100 + for epoch in range(epochs): + epoch_loss = 0 + for batch in tqdm(dataloader): + data_vectors, sentiment_gts = batch + optimizer.zero_grad() + outputs = model(data_vectors) + loss = criterion(outputs, sentiment_gts) + loss.backward() + optimizer.step() + epoch_loss += loss.item() + logger.info("Epoch {}: loss={}", epoch, epoch_loss) + +except KeyboardInterrupt: + logger.info("Training stopped by user") +# Save the model +torch.save(model.state_dict(), "./data/model.pt") +logger.info("Model saved to {}", "./data/model.pt") + +# Test the model to find the best threshold +model.eval() + +for threshold in np.arange(0.1, 1, 0.1): + logger.info("Testing with threshold={}", threshold) + num_correct = 0 + num_total = 0 + for batch in tqdm(dataloader): + data_vectors, sentiment_gts = batch + outputs = model(data_vectors) + outputs = outputs > threshold + num_correct += torch.sum(outputs == sentiment_gts).item() + num_total += len(sentiment_gts) + logger.info("Accuracy: {}", num_correct / num_total)