| | from cybersecurity_knowledge_graph.event_arg_role_dataloader import EventArgumentRoleDataset |
| | from cybersecurity_knowledge_graph.utils import arg_2_role |
| |
|
| | import os |
| | from transformers import AutoTokenizer |
| | import optuna |
| | from sklearn.model_selection import StratifiedKFold |
| | from sklearn.model_selection import cross_val_score |
| | from sklearn.metrics import make_scorer, f1_score |
| | from sklearn.ensemble import VotingClassifier |
| | from sklearn.linear_model import LogisticRegression |
| | from sklearn.neural_network import MLPClassifier |
| | from sklearn.svm import SVC |
| | from joblib import dump, load |
| | from sentence_transformers import SentenceTransformer |
| | import numpy as np |
| |
|
| | embed_model = SentenceTransformer('all-MiniLM-L6-v2') |
| |
|
| | model_checkpoint = "ehsanaghaei/SecureBERT" |
| |
|
| | tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, add_prefix_space=True) |
| |
|
| | classifiers = {} |
| | folder_path = '/cybersecurity_knowledge_graph/arg_role_models' |
| |
|
| | for filename in os.listdir(os.getcwd() + folder_path): |
| | if filename.endswith('.joblib'): |
| | file_path = os.getcwd() + os.path.join(folder_path, filename) |
| | clf = load(file_path) |
| | arg = filename.split(".")[0] |
| | classifiers[arg] = clf |
| |
|
| | """ |
| | Function: fit() |
| | Description: This function performs a machine learning task to train and evaluate classifiers for multiple argument roles. |
| | It utilizes Optuna for hyperparameter optimization and creates a Voting Classifier. |
| | The trained classifiers are saved as joblib files. |
| | """ |
| | def fit(): |
| | for arg, roles in arg_2_role.items(): |
| | if len(roles) > 1: |
| |
|
| | dataset = EventArgumentRoleDataset(path="./data/annotation/", tokenizer=tokenizer, arg=arg) |
| | dataset.load_data() |
| | dataset.train_val_test_split() |
| |
|
| |
|
| | X = [datapoint["embedding"] for datapoint in dataset.data] |
| | y = [roles.index(datapoint["label"]) for datapoint in dataset.data] |
| |
|
| |
|
| | |
| | |
| | def objective(trial): |
| |
|
| | classifier_name = trial.suggest_categorical("classifier", ["voting"]) |
| | if classifier_name == "voting": |
| | svc_c = trial.suggest_float("svc_c", 1e-3, 1e3, log=True) |
| | svc_kernel = trial.suggest_categorical("kernel", ['rbf']) |
| | classifier_obj = VotingClassifier(estimators=[ |
| | ('Logistic Regression', LogisticRegression()), |
| | ('Neural Network', MLPClassifier(max_iter=500)), |
| | ('Support Vector Machine', SVC(C=svc_c, kernel=svc_kernel)) |
| | ], voting='hard') |
| |
|
| | f1_scorer = make_scorer(f1_score, average = "weighted") |
| | stratified_kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) |
| | cv_scores = cross_val_score(classifier_obj, X, y, cv=stratified_kfold, scoring=f1_scorer) |
| | return cv_scores.mean() |
| |
|
| |
|
| | study = optuna.create_study(direction="maximize") |
| | study.optimize(objective, n_trials=20) |
| | print(f"{arg} : {study.best_trial.values[0]}") |
| |
|
| | best_clf = VotingClassifier(estimators=[ |
| | ('Logistic Regression', LogisticRegression()), |
| | ('Neural Network', MLPClassifier(max_iter=500)), |
| | ('Support Vector Machine', SVC(C=study.best_trial.params["svc_c"], kernel=study.best_trial.params["kernel"])) |
| | ], voting='hard') |
| |
|
| | best_clf.fit(X, y) |
| | dump(best_clf, f'{arg}.joblib') |
| |
|
| | """ |
| | Function: get_arg_roles(event_args, doc) |
| | Description: This function assigns argument roles to a list of event arguments within a document. |
| | Inputs: |
| | - event_args: A list of event argument dictionaries, each containing information about an argument. |
| | - doc: A spaCy document representing the analyzed text. |
| | Output: |
| | - The input 'event_args' list with updated 'role' values assigned to each argument. |
| | """ |
| | def get_arg_roles(event_args, doc): |
| | for arg in event_args: |
| | if len(arg_2_role[arg["subtype"]]) > 1: |
| | sent = next(filter(lambda x : arg["startOffset"] >= x.start_char and arg["endOffset"] <= x.end_char, doc.sents)) |
| |
|
| | sent_embed = embed_model.encode(sent.text) |
| | arg_embed = embed_model.encode(arg["text"]) |
| | embed = np.concatenate((sent_embed, arg_embed)) |
| |
|
| | arg_clf = classifiers[arg["subtype"]] |
| | role_id = arg_clf.predict(embed.reshape(1, -1)) |
| | role = arg_2_role[arg["subtype"]][role_id[0]] |
| |
|
| | arg["role"] = role |
| | else: |
| | arg["role"] = arg_2_role[arg["subtype"]][0] |
| | return event_args |
| |
|
| |
|
| |
|