Source code for pyvcsshark.datastores.mongostore

import os
import sys
import tarfile

from pymongo.errors import DocumentTooLarge, DuplicateKeyError

from pyvcsshark.datastores.basestore import BaseStore
from mongoengine import connect, DoesNotExist, NotUniqueError
from pycoshark.mongomodels import VCSSystem, Project, Commit, Tag, File, People, FileAction, Hunk, Branch
from pycoshark.utils import create_mongodb_uri_string

import multiprocessing
import logging
import datetime


logger = logging.getLogger("store")


[docs]class MongoStore(BaseStore): """ Datastore implementation for saving data to the mongodb. Inherits from :class:`pyvcsshark.datastores.basestore.BaseStore`. :property commit_queue: instance of a :class:`multiprocessing.JoinableQueue`, which \ holds objects of :class:`pyvcsshark.dbmodels.models.CommitModel`, that should be put into the mongodb :property logger: holds the logging instance, by calling logging.getLogger("store") """ commit_queue = None def __init__(self): BaseStore.__init__(self)
[docs] def initialize(self, config, repository_url, repository_type): """Initializes the mongostore by connecting to the mongodb, creating the project in the project collection \ and setting up processes (see: :class:`pyvcsshark.datastores.mongostore.CommitStorageProcess`, which read commits out of the commitqueue, process them and store them into the mongodb. :param config: all configuration :param repository_url: url of the repository, which is to be analyzed :param repository_type: type of the repository, which is to be analyzed (e.g. "git") """ logger.setLevel(config.debug_level) logger.info("Initializing MongoStore...") # Create queue for multiprocessing self.commit_queue = multiprocessing.JoinableQueue() # we need an extra queue for branches because all commits need to be finished before we can process branches self.branch_queue = multiprocessing.JoinableQueue() self.config = config self.cores_per_job = config.cores_per_job # We define, that the user we authenticate with is in the admin database logger.info("Connecting to MongoDB...") uri = create_mongodb_uri_string(config.db_user, config.db_password, config.db_hostname, config.db_port, config.db_authentication, config.ssl_enabled) connect(config.db_database, host=uri, connect=False) # Get project_id try: project_id = Project.objects(name=config.project_name).get().id except DoesNotExist: logger.error('Project with name "%s" does not exist in database!' % config.project_name) sys.exit(1) # Check if vcssystem already exist, and use upsert vcs_system = VCSSystem.objects(url=repository_url).upsert_one(url=repository_url, repository_type=repository_type, last_updated=datetime.datetime.today(), project_id=project_id) self.vcs_system_id = vcs_system.id # Tar.gz name based on project name tar_gz_name = '{}.tar.gz'.format(config.project_name) # Tar.gz of repository folder with tarfile.open(tar_gz_name, "w:gz") as tar: tar.add(config.path, arcname=config.project_name) # Add repository to gridfs if not existent if vcs_system.repository_file.grid_id is None: logger.info('Copying project to gridfs...') # Store in gridfs with open(tar_gz_name, 'rb') as tar_file: vcs_system.repository_file.put(tar_file, content_type='application/gzip', filename=tar_gz_name) vcs_system.save() else: # replace file if not existent logger.info('Replacing project file in gridfs...') with open(tar_gz_name, 'rb') as tar_file: vcs_system.repository_file.replace(tar_file, content_type='application/gzip', filename=tar_gz_name) vcs_system.save() # Delete tar.gz file os.remove(tar_gz_name) # Get the last commit by date of the project (if there is any) last_commit = Commit.objects(vcs_system_id=self.vcs_system_id)\ .only('committer_date').order_by('-committer_date').first() if last_commit is not None: last_commit_date = last_commit.committer_date else: last_commit_date = None # Start worker, they will wait till something comes into the queue and then process it for i in range(self.cores_per_job): name = "StorageProcess-%d" % i process = CommitStorageProcess(self.commit_queue, self.vcs_system_id, last_commit_date, self.config, name) process.daemon = True process.start() logger.info("Starting storage Process...")
@property def store_identifier(self): """Returns the identifier **mongo** for this datastore""" return 'mongo'
[docs] def add_commit(self, commit_model): """Adds commits of class :class:`pyvcsshark.dbmodels.models.CommitModel` to the commitqueue""" # add to queue self.commit_queue.put(commit_model) return
[docs] def add_branch(self, branch_model): """Add branch to extra queue""" self.branch_queue.put(branch_model) return
[docs] def finalize(self): """As we depend on commits beeing finished with branches (for the references) we must wait first for them to finish before we can start our branch processing.""" self.commit_queue.join() # after commits are finished, process branches for i in range(self.cores_per_job): name = "StorageProcessBranch-%d" % i process = BranchStorageProcess(self.branch_queue, self.vcs_system_id, self.config, name) process.daemon = True process.start() # wait for branches to finish self.branch_queue.join() logger.info("Storing Process complete...") return
class BranchStorageProcess(multiprocessing.Process): def __init__(self, queue, vcs_system_id, config, name): multiprocessing.Process.__init__(self) uri = create_mongodb_uri_string(config.db_user, config.db_password, config.db_hostname, config.db_port, config.db_authentication, config.ssl_enabled) connect(config.db_database, host=uri, connect=False) self.queue = queue self.vcs_system_id = vcs_system_id self.proc_name = name def run(self): """Endless loop for the processes. 1. Get a object of class :class:`pyvcsshark.dbmodels.models.BranchModel` from the queue 2. Check if this branch was stored before and if so: update the branch, if not create branch """ while True: branch = self.queue.get() logger.debug("Process {} is processing branch {} -> {}".format(self.proc_name, branch.name, branch.target)) # get commit OID for Target ref mongo_commit = Commit.objects.get(vcs_system_id=self.vcs_system_id, revision_hash=branch.target) # Try to get the commit try: mongo_branch = Branch.objects.get(vcs_system_id=self.vcs_system_id, name=branch.name) except DoesNotExist: mongo_branch = Branch( vcs_system_id=self.vcs_system_id, name=branch.name, commit_id=mongo_commit.id ).save() mongo_branch.commit_id = mongo_commit.id mongo_branch.is_origin_head = branch.is_origin_head mongo_branch.save() logger.debug("Process %s saved branch %s. Queue size: %d" % (self.proc_name, branch.name, self.queue.qsize())) self.queue.task_done()
[docs]class CommitStorageProcess(multiprocessing.Process): """Class that inherits from :class:`multiprocessing.Process` for processing instances of class :class:`pyvcsshark.dbmodels.models.CommitModel` \ and writing it into the mongodb :param queue: queue, where the :class:`pyvcsshark.dbmodels.models.CommitModel` are stored in :param vcs_system_id: object id of class :class:`bson.objectid.ObjectId` from the vcs system :param last_commit_date: object of class :class:`datetime.datetime`, which holds the last commit that was parsed :param config: object of class :class:`pyvcsshark.config.Config`, which holds configuration information """ def __init__(self, queue, vcs_system_id, last_commit_date, config, name): multiprocessing.Process.__init__(self) uri = create_mongodb_uri_string(config.db_user, config.db_password, config.db_hostname, config.db_port, config.db_authentication, config.ssl_enabled) connect(config.db_database, host=uri, connect=False) self.queue = queue self.vcs_system_id = vcs_system_id self.last_commit_date = last_commit_date self.proc_name = name
[docs] def run(self): """ Endless loop for the processes, which consists of several steps: 1. Get a object of class :class:`pyvcsshark.dbmodels.models.CommitModel` from the queue 2. Check if this commit was stored before and if it is so: update branches and tags (if they have changed) 3. Store author and committer in mongodb 4. Store Tags in mongodb 5. Create a list of branches, where the commit belongs to 6. Save the different file actions, which were done in this commit in the mongodb 7. Save the commit itself .. NOTE:: The committer date is used to check if a commit was already stored before. Meaning: We get the \ last commit out of the database and check if the committer date of the commits we process are > than the \ committer date of the last commit. .. WARNING:: We only look for changed tags and branches here for already processed commits! """ while True: commit = self.queue.get() logger.debug("Process %s is processing commit with hash %s." % (self.proc_name, commit.id)) # Try to get the commit try: mongo_commit = Commit.objects(vcs_system_id=self.vcs_system_id, revision_hash=commit.id).get() except DoesNotExist: mongo_commit = Commit( vcs_system_id=self.vcs_system_id, revision_hash=commit.id ).save() self.set_whole_commit(mongo_commit, commit) # Save Revision object mongo_commit.save() logger.debug("Process %s saved commit with hash %s. Queue size: %d" % (self.proc_name, commit.id, self.queue.qsize())) self.queue.task_done()
def set_whole_commit(self, mongo_commit, commit): # Create tags logger.debug("Process %s is creating tags for commit with hash %s." % (self.proc_name, commit.id)) self.create_tags(mongo_commit.id, commit.tags) # Create branchlist logger.debug("Process %s is creating branches for commit with hash %s." % (self.proc_name, commit.id)) mongo_commit.branches = self.create_branch_list(commit.branches) # Create people logger.debug("Process %s is setting author for commit with hash %s." % (self.proc_name, commit.id)) mongo_commit.author_id = self.create_people(commit.author.name, commit.author.email) mongo_commit.author_date = commit.authorDate mongo_commit.author_date_offset = commit.authorOffset logger.debug("Process %s is setting committer for commit with hash %s." % (self.proc_name, commit.id)) mongo_commit.committer_id = self.create_people(commit.committer.name, commit.committer.email) mongo_commit.committer_date = commit.committerDate mongo_commit.committer_date_offset = commit.committerOffset # Set parent hashes logger.debug("Process %s is setting parents for commit with hash %s." % (self.proc_name, commit.id)) mongo_commit.parents = commit.parents # Set message logger.debug("Process %s is setting message for commit with hash %s." % (self.proc_name, commit.id)) mongo_commit.message = commit.message # Create fileActions logger.debug("Process %s is setting file actions for commit with hash %s." % (self.proc_name, commit.id)) self.create_file_actions(commit.changedFiles, mongo_commit.id)
[docs] def create_branch_list(self, branches): """Creates a list of the different branch names, where a commit belongs to. We go through the \ branches property of the class :class:`pyvcsshark.dbmodels.models.CommitModel`, which is a list of \ different branch objects of class `pyvcsshark.dbmodels.models.CommitModel` :param branches: list of objects of class :class:`pyvcsshark.dbmodels.models.BranchModel` """ branch_list = [] for branch in branches: if branch is not None: branch_list.append(branch.name) if len(branch_list) == 0: branch_list = None return branch_list
def create_tags(self, commit_id, tags): tag_list = [] for tag in tags: if tag.tagger is not None: tagger_id = self.create_people(tag.tagger.name, tag.tagger.email) try: logger.debug("Process %s is creating tag %s with tagger." % (self.proc_name, tag.name)) mongo_tag = Tag(commit_id=commit_id, name=tag.name, message=tag.message, tagger_id=tagger_id, date=tag.taggerDate, date_offset=tag.taggerOffset, vcs_system_id=self.vcs_system_id).save() except (DuplicateKeyError, NotUniqueError): logger.debug("Process %s found tag with tagger with name %s." % (self.proc_name, tag.name)) mongo_tag = Tag.objects(commit_id=commit_id, name=tag.name) \ .only('id', 'name').get() else: try: logger.debug("Process %s is creating tag %s." % (self.proc_name, tag.name)) mongo_tag = Tag(commit_id=commit_id, name=tag.name, date=tag.taggerDate, date_offset=tag.taggerOffset, vcs_system_id=self.vcs_system_id).save() except (DuplicateKeyError, NotUniqueError): logger.debug("Process %s is found tag %s." % (self.proc_name, tag.name)) mongo_tag = Tag.objects(commit_id=commit_id, name=tag.name).only('id', 'name').get() tag_list.append(mongo_tag) return tag_list
[docs] def create_people(self, name, email): """ Creates a people object of type People (which can be found in the pycoshark library) and returns a object id of the type :class:`bson.objectid.ObjectId` of the stored object :param name: name of the contributor :param email: email of the contributor .. NOTE:: The call to :func:`mongoengine.queryset.QuerySet.upsert_one` is thread/process safe """ try: logger.debug("Process %s is creating person with email %s and name %s." % (self.proc_name, email, name)) people_id = People(name=name, email=email).save().id except (DuplicateKeyError, NotUniqueError): logger.debug("Process %s found person with email %s and name %s." % (self.proc_name, email, name)) people_id = People.objects(name=name, email=email).only('id').get().id return people_id
[docs] def create_file_actions(self, files, mongo_commit_id): """ Creates a list of object ids of type :class:`bson.objectid.ObjectId` for the different file actions of the commit by transforming the files into file actions of type FileAction, File, and Hunk (pycoshark library) :param files: list of changed files of type :class:`pyvcsshark.dbmodels.models.FileModel` :param mongo_commit_id: mongoid of the commit which is processed .. NOTE:: Hunks and the file action itself are inserted via bulk insert. """ for file in files: # Check if the file was a copy or move action (then the oldPath attribute is not None) old_file_id = None if file.oldPath is not None: logger.debug("Process %s is creating old file with path %s." % (self.proc_name, file.oldPath)) try: old_file_id = File(vcs_system_id=self.vcs_system_id, path=file.oldPath).save().id except (DuplicateKeyError, NotUniqueError): logger.debug("Process %s found old file with path %s." % (self.proc_name, file.oldPath)) old_file_id = File.objects(vcs_system_id=self.vcs_system_id, path=file.oldPath).only('id').get().id # Create a new file object try: logger.debug("Process %s is creating file with path %s." % (self.proc_name, file.path)) new_file_id = File(vcs_system_id=self.vcs_system_id, path=file.path).save().id except (DuplicateKeyError, NotUniqueError): logger.debug("Process %s found file with path %s." % (self.proc_name, file.path)) new_file_id = File.objects(vcs_system_id=self.vcs_system_id, path=file.path).only('id').get().id # Create the new file action try: logger.debug("Process %s is creating file action with file_id %s." % (self.proc_name, new_file_id)) file_action_id = FileAction.objects(file_id=new_file_id, commit_id=mongo_commit_id, parent_revision_hash=file.parent_revision_hash).get().id logger.debug("Process %s is deleting all hunks for file action id %s." % (self.proc_name, file_action_id)) Hunk.objects(file_action_id=file_action_id).all().delete() except DoesNotExist: file_action_id = FileAction(file_id=new_file_id, commit_id=mongo_commit_id, size_at_commit=file.size, lines_added=file.linesAdded, lines_deleted=file.linesDeleted, is_binary=file.isBinary, mode=file.mode, old_file_id=old_file_id, parent_revision_hash=file.parent_revision_hash).save().id # Create hunk objects for bulk insert logger.debug("Process %s is creating hunks for bulk insert." % self.proc_name) hunks = [] for hunk in file.hunks: mongo_hunk = Hunk(file_action_id=file_action_id, new_start=hunk.new_start, new_lines=hunk.new_lines, old_start=hunk.old_start, old_lines=hunk.old_lines, content=hunk.content) hunks.append(mongo_hunk) # Get hunk ids from insert if hunks is not empty if hunks: try: logger.debug("Process %s is inserting hunks..." % self.proc_name) Hunk.objects.insert(hunks, load_bulk=False) except DocumentTooLarge: for hunk in hunks: try: hunk.save() except DocumentTooLarge: logger.info("Document was too large for commit: %s" % mongo_commit_id)