import copy
import csv
import glob
import logging
import os
import sys
from mongoengine import DoesNotExist
from pycoshark.mongomodels import Project, VCSSystem, Commit, File, CodeGroupState, CodeEntityState, CloneInstance
from pycoshark.utils import get_code_entity_state_identifier, get_code_group_state_identifier
logger = logging.getLogger("sourcemeter_parser")
[docs]class SourcemeterParser(object):
"""
Parser that parses the results from sourcemeter
:property output_path: path to an output directory, where files can be stored
:property input_path: path to the revisionn that is used as input
:property url: url to the repository of the project that is analyzed
:property vcs_system_id: id of the vcs_system with the given url
:property stored_files: list of files that are stored at the input path
:property ordered_file_states: dictionary that have all results in an ordered manner (a state that have another as parent must be after this parent state)
:property stored_file_states: states that were stored in the mongodb
:property stored_meta_package_states: meta package states that were stored in the mongodb
:property input_files: list of input files
:property commit_id: id of the commit for which the data should be stored. :class:`bson.objectid.ObjectId`
"""
[docs] def __init__(self, output_path, input_path, project_name, url, revision_hash, debug_level):
"""
Initialization
:param output_path: path to an output directory, where files were stored
:param input_path: path to the revision that is used as input
:param url: url to the repository of the project that is analyzed
:param revision_hash: hash of the revision, which is analyzed
:param debug_level: debug level, like defined in :mod:`logging`
"""
# Set variables
self.output_path = output_path
self.input_path = input_path
self.project_name = project_name
self.url = url
self.revision_hash = revision_hash
# Default dictionaries and lists
self.ordered_file_states = {}
self.stored_file_states = {}
self.stored_meta_package_states = {}
self.input_files = []
# Get logger
logger.setLevel(debug_level)
# Get project id and find all stored files in the current input path (needed for java projects)
self.vcs_system_id = self.get_vcs_system_id()
self.commit_id = self.get_commit_id(self.vcs_system_id)
self.stored_files = self.find_stored_files()
# Prepare csv files
self.prepare_csv_files()
[docs] def get_commit_id(self, vcs_system_id):
"""
Gets the commit id for the corresponding projectid and revision
:param vcs_system_id: id of the vcs system. :class:`bson.objectid.ObjectId`
:return: commit_id (:class:`bson.objectid.ObjectId`)
"""
try:
return Commit.objects(vcs_system_id=vcs_system_id, revision_hash=self.revision_hash).get().id
except DoesNotExist:
logger.error("Commit with vcs_system_id %s and revision %s does not exist" %
(vcs_system_id, self.revision_hash))
sys.exit(1)
[docs] def get_vcs_system_id(self):
"""
Gets the project id for the given url
:param url: url of the vcs_system
:return: vcs_system_id (:class:`bson.objectid.ObjectId`)
"""
try:
project = Project.objects.get(name=self.project_name)
return VCSSystem.objects(url=self.url, project_id=project.id).get().id
except DoesNotExist:
logger.error("VCSSystem with the url %s does not exist in the database! Execute vcsSHARK first!" % self.url)
sys.exit(1)
[docs] def find_stored_files(self):
"""
We need to find all files that are stored in the input path. This is needed to link the files that were parsed
with the files that are already stored via vcsSHARK.
:return: dictionary with file path as key and id as value (from vcsshark results)
"""
# get list of files in input_path
self.input_files = []
for root, dirs, files in os.walk(self.input_path, topdown=True):
for name in files:
full_file_path = os.path.join(root, name).replace(self.input_path, "")
# Filter out git directory
if not full_file_path.startswith("/.git/"):
self.input_files.append(full_file_path)
# get all stored files of the project
stored_files = {}
for file in File.objects(vcs_system_id=self.vcs_system_id):
stored_files[file.path] = file.id
return stored_files
[docs] @staticmethod
def get_csv_file(path):
"""
Return a filepath or none if nothing is found.
:param path: path to file (regex)
:return: filepath or none
"""
result = glob.glob(path)
if len(result) > 0:
return result[0]
return None
[docs] def prepare_csv_files(self):
"""
Prepares the csv files generated by SourceMeter by creating a sort key and sort it after it
"""
all_csv_paths = {
'class': self.get_csv_file(os.path.join(self.output_path, "*-Class.csv")),
'enum': self.get_csv_file(os.path.join(self.output_path, "*-Enum.csv")),
'interface': self.get_csv_file(os.path.join(self.output_path, "*-Interface.csv")),
'method': self.get_csv_file(os.path.join(self.output_path, "*-Method.csv")),
'annotation': self.get_csv_file(os.path.join(self.output_path, "*-Annotation.csv")),
'attribute': self.get_csv_file(os.path.join(self.output_path, "*-Attribute.csv")),
'component': self.get_csv_file(os.path.join(self.output_path, "*-Component.csv")),
'file': self.get_csv_file(os.path.join(self.output_path, "*-File.csv")),
'function': self.get_csv_file(os.path.join(self.output_path, "*-Function.csv")),
'module': self.get_csv_file(os.path.join(self.output_path, "*-Module.csv")),
'package': self.get_csv_file(os.path.join(self.output_path, "*-Package.csv")),
'namespace': self.get_csv_file(os.path.join(self.output_path, "*-Namespace.csv")),
'structure': self.get_csv_file(os.path.join(self.output_path, "*-Structure.csv")),
'union': self.get_csv_file(os.path.join(self.output_path, "*-Union.csv")),
}
file_states = []
for name, path in all_csv_paths.items():
if path is not None:
logger.info("Open path: "+path)
with open(path) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
row['type'] = name
if name == 'file':
row['sortKey'] = '2'
row['Path'] = row['LongName']
file_states.append(row)
continue
if 'Parent' in row:
if row['Parent'] == '__LogicalRoot__':
row['sortKey'] = '1'
file_states.append(row)
else:
row['sortKey'] = row['Parent'].strip('L')
file_states.append(row)
else:
row['sortKey'] = '0'
file_states.append(row)
file_states = sorted(file_states, key=lambda k: int(k['sortKey']))
self.ordered_file_states = self.sort_for_parent(file_states)
[docs] @staticmethod
def sort_for_parent(state_dict):
"""
Sorts the given dictionary in a way, that the parent states of the states must be before it.
Special rules apply for file states, as they do not have any parents.
:param state_dict: dictionary of states that should be ordered
:return: ordered dictionary
.. NOTE:: Example: X has parent Y, Y has parent Z. Therefore, it would be ordered:\
Z -> Y -> X
"""
not_finished = True
new_dict = []
written_ids = []
while not_finished:
for row in state_dict:
if 'Parent' not in row and row['ID'] not in written_ids:
written_ids.append(row['ID'])
new_dict.append(row)
if 'Parent' in row and row['ID'] not in written_ids and (row['Parent'] in written_ids
or row['Parent'] == '__LogicalRoot__'
or row['type'] == 'file'):
written_ids.append(row['ID'])
new_dict.append(row)
if len(state_dict) == len(written_ids):
not_finished = False
return new_dict
[docs] def store_data(self):
"""
Call to store data: If they have 'Path' in the row, file states data is stored. Otherwise, meta package data
:return:
"""
for row in self.ordered_file_states:
if 'Path' in row:
self.store_file_states_data(row)
else:
self.store_meta_package_data(row)
self.store_clone_data()
self.store_extra_data()
#elif pylint_file_path:
# self.parse_pylint_file(pylint_file_path[0])
def parse_pmd_file(self, path):
logger.info("Parsing & storing pmd warnings...")
with open(path) as pmd_file:
data = pmd_file.readlines()
# Go through all warnings that pmd reported
file_warnings = {}
for line in data:
line = line.strip()
parts = line.split(":")
file_parts = parts[0].strip()
pmd_type = parts[1].strip()
message = parts[2].strip()
file_path = file_parts.split("(")[0]
file_path = file_path.replace(self.input_path.rstrip("/") + "/", "")
line_number = file_parts.split("(")[1].strip(")")
warnings = file_warnings.get(file_path, list())
warnings.append({"ln": int(line_number), "l_ty": pmd_type, "msg": message})
file_warnings[file_path] = warnings
logger.debug("Found the following pmd warnings: %s" % file_warnings)
for file_path, data in file_warnings.items():
try:
# Get file id
m_file = File.objects(path=file_path, vcs_system_id=self.vcs_system_id).get()
# Get code entity state
identifier = get_code_entity_state_identifier(file_path, self.commit_id, m_file.id)
m_ces = CodeEntityState.objects(s_key=identifier).get()
m_ces.linter.clear()
for warnings in data:
m_ces.linter.append(warnings)
# Save code entity state
m_ces.save()
except DoesNotExist:
logger.warning("Code Entity State for file %s does not exist!" % file_path)
[docs] def get_component_ids(self, row_component_ids):
"""
Function that gets the component ids from the component ids string.
:param row_component_ids: component ids string
:return: ObjectIds of all components as list (:class:`bson.objectid.ObjectId`)
"""
# get list of objectids for all components in the csv file
row_component_ids = row_component_ids.split(",")
component_object_ids = []
for row_component_id in row_component_ids:
component_object_ids.append(self.stored_meta_package_states[row_component_id.strip()])
return component_object_ids
[docs] def store_file_states_data(self, row):
"""
Stores the file states data.
Fills the stored_file_states property for less database communication.
:param row: row that is processed:
.. NOTE:: File states have a direct connection to a file from a revision.
"""
path_name = self.sanitize_long_name(row['Path'])
# We only need to sanitize the long name for files, Otherwise we store it like it comes out of sourcemeter
if row['type'] == 'file':
long_name = self.sanitize_long_name(row['LongName'])
else:
long_name = row['LongName']
cg_ids = []
ce_parent_id = None
if 'Parent' in row and row['Parent'] in self.stored_meta_package_states:
cg_ids.append(self.stored_meta_package_states[row['Parent']])
elif 'Parent' in row and row['Parent'] in self.stored_file_states:
ce_parent_id = self.stored_file_states[row['Parent']]
elif 'Parent' in row and row['type'] != 'file':
logger.warning("ERROR! Parent not found for %s!" % row)
if 'Component' in row:
cg_ids.extend(self.get_component_ids(row['Component']))
start_line = None
end_line = None
start_column = None
end_column = None
if 'Line' in row and 'EndLine' in row and 'Column' in row and 'EndColumn' in row:
start_line = row['Line']
end_line = row['EndLine']
start_column = row['Column']
end_column = row['EndColumn']
try:
s_key = get_code_entity_state_identifier(long_name, self.commit_id, self.stored_files[path_name])
tmp = {'set__metrics__{}'.format(k): v for k, v in self.sanitize_metrics_dictionary(copy.deepcopy(row)).items()}
tmp['s_key'] = s_key
tmp['long_name'] = long_name
tmp['commit_id'] = self.commit_id
tmp['file_id'] = self.stored_files[path_name]
tmp['ce_type'] = row['type']
tmp['cg_ids'] = cg_ids
tmp['ce_parent_id'] = ce_parent_id
tmp['start_line'] = start_line
tmp['end_line'] = end_line
tmp['start_column'] = start_column
tmp['end_column'] = end_column
state_id = CodeEntityState.objects(s_key=s_key).upsert_one(**tmp).id
self.stored_file_states[row['ID']] = state_id
except KeyError:
# This should not happen, but it can happen, e.g., for the conftest.cpp file for C/c++ projects, which
# is just temporally created
logger.warning("Could not store results for file %s" % path_name)
[docs] def store_clone_data(self):
"""
Parses and stores the cloning data that was generated by sourcemeter.
"""
logger.info("Parsing & storing clone data...")
clone_class_csv_path = glob.glob(os.path.join(self.output_path, "*-CloneClass.csv"))[0]
clone_instance_csv_path = glob.glob(os.path.join(self.output_path, "*-CloneInstance.csv"))[0]
clone_classes = {}
with open(clone_class_csv_path) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
clone_classes[row['ID']] = self.sanitize_metrics_dictionary(copy.deepcopy(row))
with open(clone_instance_csv_path) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
metrics_dict = self.sanitize_metrics_dictionary(copy.deepcopy(row))
long_name = self.sanitize_long_name(row['Path'])
tmp = {
'commit_id': self.commit_id,
'name': row['ID'],
'file_id': self.stored_files[long_name],
'clone_class': row['Parent'],
'clone_class_metrics': clone_classes[row['Parent']],
'clone_instance_metrics': metrics_dict,
'start_line': row['Line'],
'end_line': row['EndLine'],
'start_column': row['Column'],
'end_column': row['EndColumn']
}
CloneInstance.objects(name=row['ID'], commit_id=self.commit_id,
file_id=self.stored_files[long_name]).upsert_one(**tmp)
logger.info("Finished parsing & storing clone data!")
[docs] @staticmethod
def sanitize_metrics_dictionary(metrics):
"""
Helper function, which sanitizes the csv reader row so that it only contains the metrics of it.
:param metrics: csv reader row
:return: dictionary of metrics
"""
del metrics['Name']
del metrics['ID']
if 'LongName' in metrics:
del metrics['LongName']
if 'type' in metrics:
del metrics['type']
if 'sortKey' in metrics:
del metrics['sortKey']
if 'Component' in metrics:
del metrics['Component']
if 'Path' in metrics:
del metrics['Path']
if 'Parent' in metrics:
del metrics['Parent']
if 'WarningBlocker' in metrics:
del metrics['WarningBlocker']
if 'WarningCritical' in metrics:
del metrics['WarningCritical']
if 'WarningInfo' in metrics:
del metrics['WarningInfo']
if 'WarningMajor' in metrics:
del metrics['WarningMajor']
if 'WarningMinor' in metrics:
del metrics['WarningMinor']
if 'Line' in metrics:
del metrics['Line']
if 'EndLine' in metrics:
del metrics['EndLine']
if 'Column' in metrics:
del metrics['Column']
if 'EndColumn' in metrics:
del metrics['EndColumn']
for name, value in metrics.items():
if not value:
metrics[name] = float(0)
else:
try:
metrics[name] = float(value)
except ValueError:
metrics[name] = value
return metrics
[docs] def sanitize_long_name(self, orig_long_name):
"""
Sanitizes the long_name of the row.
1) If the long_name has the input path in it: just strip it
2) If the long_name has the output path in it: just strip it
3) Otherwise: The long_name will be separated by "/" and joined together after the first part was split.
:param orig_long_name: long_name of the row
:return: sanitized long_name
.. NOTE:: This is necessary, as the output of sourcemeter can be different based on which processor is used.
"""
if self.input_path in orig_long_name:
long_name = orig_long_name.replace(self.input_path + "/", "")
elif self.output_path in orig_long_name:
long_name = orig_long_name.replace(self.output_path + "/", "")
else:
long_name = "/".join(orig_long_name.strip("/").split('/')[1:])
# if long_name is not an empty string
if long_name:
long_name = self.get_fullpath(long_name)
else:
long_name = orig_long_name
if long_name is not None and long_name.startswith("/"):
long_name = long_name.strip("/")
return long_name
[docs] def get_fullpath(self, long_name):
"""
If the long_name is in the input files of the input path, it will return the corresponding file name
:param long_name: long_name of the row
:return: new long_name
"""
for file_name in self.input_files:
if file_name.endswith(long_name):
return file_name
return long_name