Source code for glare.db.sqlalchemy.api

# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import hashlib
import operator
import threading

from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db import options
from oslo_db.sqlalchemy import session
from oslo_log import log as os_logging
from oslo_utils import timeutils
import osprofiler.sqlalchemy
from retrying import retry
import six
import sqlalchemy
from sqlalchemy import and_
import sqlalchemy.exc
from sqlalchemy import exists
from sqlalchemy import func
from sqlalchemy import or_
import sqlalchemy.orm as orm
from sqlalchemy.orm import aliased
from sqlalchemy.orm import joinedload

from glare.common import exception
from glare.common import semver_db
from glare.common import utils
from glare.db.sqlalchemy import models
from glare.i18n import _

LOG = os_logging.getLogger(__name__)

CONF = cfg.CONF
CONF.import_group("profiler", "glare.common.wsgi")
options.set_defaults(CONF)


BASE_ARTIFACT_PROPERTIES = ('id', 'visibility', 'created_at', 'updated_at',
                            'activated_at', 'owner', 'status', 'description',
                            'name', 'type_name', 'version')

_FACADE = None
_LOCK = threading.Lock()


def _retry_on_deadlock(exc):
    """Decorator to retry a DB API call if Deadlock was received."""

    if isinstance(exc, db_exception.DBDeadlock):
        LOG.warning("Deadlock detected. Retrying...")
        return True
    return False


def _create_facade_lazily():
    global _LOCK, _FACADE
    if _FACADE is None:
        with _LOCK:
            if _FACADE is None:
                _FACADE = session.EngineFacade.from_config(CONF)

                if CONF.profiler.enabled and CONF.profiler.trace_sqlalchemy:
                    osprofiler.sqlalchemy.add_tracing(sqlalchemy,
                                                      _FACADE.get_engine(),
                                                      "db")
    return _FACADE


[docs]def get_engine(): facade = _create_facade_lazily() return facade.get_engine()
[docs]def get_session(autocommit=True, expire_on_commit=False): facade = _create_facade_lazily() return facade.get_session(autocommit=autocommit, expire_on_commit=expire_on_commit)
[docs]def setup_db(): engine = get_engine() models.register_models(engine)
[docs]def drop_db(): engine = get_engine() models.unregister_models(engine)
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def delete(context, artifact_id, session): with session.begin(): session.query(models.Artifact).filter_by(id=artifact_id).delete() def _drop_protected_attrs(model_class, values): """Removed protected attributes from values dictionary using the models __protected_attributes__ field. """ for attr in model_class.__protected_attributes__: if attr in values: del values[attr] @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) @utils.no_4byte_params def create_or_update(context, artifact_id, values, session): with session.begin(): _drop_protected_attrs(models.Artifact, values) if artifact_id is None: # create new artifact artifact = models.Artifact() artifact.id = values.pop('id') else: # update the existing artifact artifact = _get(context, artifact_id, session) if 'version' in values: values['version'] = semver_db.parse(values['version']) if 'tags' in values: tags = values.pop('tags') artifact.tags = _do_tags(artifact, tags) if 'properties' in values: properties = values.pop('properties', {}) artifact.properties = _do_properties(artifact, properties) if 'blobs' in values: blobs = values.pop('blobs') artifact.blobs = _do_blobs(artifact, blobs) artifact.updated_at = timeutils.utcnow() if 'status' in values: if session.query(exists().where(and_( models.ArtifactBlob.status == 'saving', models.ArtifactBlob.artifact_id == artifact_id)) ).one()[0]: raise exception.Conflict( "You cannot change artifact status if it has " "uploading blobs.") if values['status'] == 'active': artifact.activated_at = timeutils.utcnow() artifact.update(values) artifact.save(session=session) LOG.debug("Response from the database was received.") return artifact.to_dict() def _get(context, artifact_id, session): try: query = _do_artifacts_query(context, session).filter_by( id=artifact_id) artifact = query.one() except orm.exc.NoResultFound: msg = _("Artifact with id=%s not found.") % artifact_id LOG.warning(msg) raise exception.ArtifactNotFound(msg) return artifact
[docs]def get(context, artifact_id, session): return _get(context, artifact_id, session).to_dict()
[docs]def get_all(context, session, filters=None, marker=None, limit=None, sort=None, latest=False): """List all visible artifacts :param filters: dict of filter keys and values. :param marker: artifact id after which to start page :param limit: maximum number of artifacts to return :param sort: a tuple (key, dir, type) where key is an attribute by which results should be sorted, dir is a direction: 'asc' or 'desc', and type is type of the attribute: 'bool', 'string', 'numeric' or 'int' or None if attribute is base. :param latest: flag that indicates, that only artifacts with highest versions should be returned in output """ artifacts = _get_all( context, session, filters, marker, limit, sort, latest) return [af.to_dict() for af in artifacts]
def _apply_latest_filter(context, session, query, basic_conds, tag_conds, prop_conds): # Subquery to fetch max version suffix for a group (name, # version_prefix) ver_suffix_subq = _apply_query_base_filters( session.query( models.Artifact.name, models.Artifact.version_prefix, func.max(models.Artifact.version_suffix).label( 'max_suffix')).group_by( models.Artifact.name, models.Artifact.version_prefix), context) ver_suffix_subq = _apply_user_filters( ver_suffix_subq, basic_conds, tag_conds, prop_conds).subquery() # Subquery to fetch max version prefix for a name group ver_prefix_subq = _apply_query_base_filters( session.query(models.Artifact.name, func.max( models.Artifact.version_prefix).label('max_prefix')).group_by( models.Artifact.name), context) ver_prefix_subq = _apply_user_filters( ver_prefix_subq, basic_conds, tag_conds, prop_conds).subquery() # Combine two subqueries together joining them with Artifact table query = query.join( ver_prefix_subq, and_(models.Artifact.name == ver_prefix_subq.c.name, models.Artifact.version_prefix == ver_prefix_subq.c.max_prefix)).join( ver_suffix_subq, and_(models.Artifact.name == ver_suffix_subq.c.name, models.Artifact.version_prefix == ver_suffix_subq.c.version_prefix, models.Artifact.version_suffix == ver_suffix_subq.c.max_suffix) ) return query def _apply_user_filters(query, basic_conds, tag_conds, prop_conds): if basic_conds: for basic_condition in basic_conds: query = query.filter(and_(*basic_condition)) if tag_conds: for tag_condition in tag_conds: query = query.join(models.ArtifactTag, aliased=True).filter( and_(*tag_condition)) if prop_conds: for prop_condition in prop_conds: query = query.join(models.ArtifactProperty, aliased=True).filter( and_(*prop_condition)) return query def _get_all(context, session, filters=None, marker=None, limit=None, sort=None, latest=False): filters = filters or {} query = _do_artifacts_query(context, session) basic_conds, tag_conds, prop_conds = _do_query_filters(filters) query = _apply_user_filters(query, basic_conds, tag_conds, prop_conds) if latest: query = _apply_latest_filter(context, session, query, basic_conds, tag_conds, prop_conds) marker_artifact = None if marker is not None: marker_artifact = get(context, marker, session) query = _do_paginate_query(query=query, limit=limit, marker=marker_artifact, sort=sort) return query.all() def _do_paginate_query(query, marker=None, limit=None, sort=None): # Add sorting number_of_custom_props = 0 for sort_key, sort_dir, sort_type in sort: try: sort_dir_func = { 'asc': sqlalchemy.asc, 'desc': sqlalchemy.desc, }[sort_dir] except KeyError: msg = _("Unknown sort direction, must be 'desc' or 'asc'.") raise exception.BadRequest(msg) # Note(mfedosin): Workaround to deal with situation that sqlalchemy # cannot work with composite keys correctly if sort_key == 'version': query = query.order_by(sort_dir_func(models.Artifact.version_prefix))\ .order_by(sort_dir_func(models.Artifact.version_suffix))\ .order_by(sort_dir_func(models.Artifact.version_meta)) elif sort_key in BASE_ARTIFACT_PROPERTIES: # sort by generic property query = query.order_by(sort_dir_func(getattr(models.Artifact, sort_key))) else: # sort by custom property number_of_custom_props += 1 if number_of_custom_props > 1: msg = _("For performance sake it's not allowed to sort by " "more than one custom property with this db backend.") raise exception.BadRequest(msg) prop_table = aliased(models.ArtifactProperty) query = ( query.join(prop_table). filter(prop_table.name == sort_key). order_by(sort_dir_func(getattr(prop_table, sort_type + '_value')))) # Add pagination if marker is not None: marker_values = [] for sort_key, __, __ in sort: v = marker.get(sort_key, None) marker_values.append(v) # Build up an array of sort criteria as in the docstring criteria_list = [] for i in range(len(sort)): crit_attrs = [] for j in range(i): value = marker_values[j] if sort[j][0] in BASE_ARTIFACT_PROPERTIES: if sort[j][0] == 'version': value = semver_db.parse(value) crit_attrs.append([getattr(models.Artifact, sort[j][0]) == value]) else: conds = [models.ArtifactProperty.name == sort[j][0]] conds.extend([getattr(models.ArtifactProperty, sort[j][2] + '_value') == value]) crit_attrs.append(conds) value = marker_values[i] sort_dir_func = operator.gt if sort[i][1] == 'asc' else operator.lt if sort[i][0] in BASE_ARTIFACT_PROPERTIES: if sort[i][0] == 'version': value = semver_db.parse(value) crit_attrs.append([sort_dir_func(getattr(models.Artifact, sort[i][0]), value)]) else: query = query.join(models.ArtifactProperty, aliased=True) conds = [models.ArtifactProperty.name == sort[i][0]] conds.extend([sort_dir_func(getattr(models.ArtifactProperty, sort[i][2] + '_value'), value)]) crit_attrs.append(conds) criteria = [and_(*crit_attr) for crit_attr in crit_attrs] criteria_list.append(criteria) criteria_list = [and_(*cr) for cr in criteria_list] query = query.filter(or_(*criteria_list)) if limit is not None: query = query.limit(limit) return query def _do_artifacts_query(context, session): """Build the query to get all artifacts based on the context""" query = session.query(models.Artifact) query = (query.options(joinedload(models.Artifact.properties)). options(joinedload(models.Artifact.tags)). options(joinedload(models.Artifact.blobs))) return _apply_query_base_filters(query, context) def _apply_query_base_filters(query, context): # If admin, return everything. if context.is_admin: return query # If anonymous user, return only public artifacts. # However, if context.tenant has a value, return both # public and private artifacts of the owner. if context.tenant is not None: query = query.filter( or_(models.Artifact.owner == context.tenant, models.Artifact.visibility == 'public')) else: query = query.filter( models.Artifact.visibility == 'public') return query op_mappings = { 'eq': operator.eq, 'gt': operator.gt, 'gte': operator.ge, 'lt': operator.lt, 'lte': operator.le, 'neq': operator.ne, } def _do_query_filters(filters): basic_conds = [] tag_conds = [] prop_conds = [] for field_name, key_name, op, field_type, value in filters: if field_name == 'tags': tags = utils.split_filter_value_for_quotes(value) for tag in tags: tag_conds.append([models.ArtifactTag.value == tag]) elif field_name == 'tags-any': tags = utils.split_filter_value_for_quotes(value) tag_conds.append([models.ArtifactTag.value.in_(tags)]) elif field_name in BASE_ARTIFACT_PROPERTIES: if op != 'in': fn = op_mappings[op] if field_name == 'version': value = semver_db.parse(value) basic_conds.append([fn(getattr(models.Artifact, field_name), value)]) else: if field_name == 'version': value = [semver_db.parse(val) for val in value] basic_conds.append( [or_(*[ models.Artifact.version == ver for ver in value])]) else: basic_conds.append( [getattr(models.Artifact, field_name).in_(value)]) else: conds = [models.ArtifactProperty.name == field_name] if key_name is not None: if op == 'eq' or value is not None: conds.extend( [models.ArtifactProperty.key_name == key_name]) elif op == 'in': conds.extend( [models.ArtifactProperty.key_name.in_(key_name)]) if value is not None: if op != 'in': fn = op_mappings[op] conds.extend([fn(getattr(models.ArtifactProperty, field_type + '_value'), value)]) else: conds.extend([getattr(models.ArtifactProperty, field_type + '_value').in_(value)]) prop_conds.append(conds) return basic_conds, tag_conds, prop_conds def _do_tags(artifact, new_tags): tags_to_update = [] # don't touch existing tags for tag in artifact.tags: if tag.value in new_tags: tags_to_update.append(tag) new_tags.remove(tag.value) # add new tags for tag in new_tags: db_tag = models.ArtifactTag() db_tag.value = tag tags_to_update.append(db_tag) return tags_to_update def _get_prop_type(value): if isinstance(value, bool): return 'bool_value' if isinstance(value, int): return 'int_value' if isinstance(value, six.string_types): return 'string_value' if isinstance(value, float): return 'numeric_value' def _create_property(prop_name, prop_value, position=None, key_name=None): db_prop = models.ArtifactProperty() db_prop.name = prop_name setattr(db_prop, _get_prop_type(prop_value), prop_value) db_prop.position = position db_prop.key_name = key_name return db_prop def _do_properties(artifact, new_properties): props_to_update = [] # don't touch the existing properties for prop in artifact.properties: if prop.name not in new_properties: props_to_update.append(prop) for prop_name, prop_value in new_properties.items(): if prop_value is None: continue if isinstance(prop_value, list): for pos, list_prop in enumerate(prop_value): for prop in artifact.properties: if prop.name == prop_name and pos == prop.position: if getattr(prop, _get_prop_type( list_prop)) != list_prop: setattr(prop, _get_prop_type(list_prop), list_prop) props_to_update.append(prop) break else: props_to_update.append( _create_property(prop_name, list_prop, position=pos) ) elif isinstance(prop_value, dict): for dict_key, dict_val in prop_value.items(): for prop in artifact.properties: if prop.name == prop_name and prop.key_name == dict_key: if getattr(prop, _get_prop_type(dict_val)) != dict_val: setattr(prop, _get_prop_type(dict_val), dict_val) props_to_update.append(prop) break else: props_to_update.append( _create_property(prop_name, dict_val, key_name=dict_key) ) elif prop_value is not None: for prop in artifact.properties: if prop.name == prop_name: setattr(prop, _get_prop_type(prop_value), prop_value) props_to_update.append(prop) break else: props_to_update.append(_create_property( prop_name, prop_value)) return props_to_update def _update_blob_values(blob, values): for elem in ('size', 'md5', 'sha1', 'sha256', 'url', 'external', 'status', 'content_type'): setattr(blob, elem, values[elem]) return blob def _do_blobs(artifact, new_blobs): blobs_to_update = [] # don't touch the existing blobs for blob in artifact.blobs: if blob.name not in new_blobs: blobs_to_update.append(blob) for blob_name, blob_value in new_blobs.items(): if blob_value is None: continue if isinstance(blob_value.get('status'), str): for blob in artifact.blobs: if blob.name == blob_name: _update_blob_values(blob, blob_value) blobs_to_update.append(blob) break else: blob = models.ArtifactBlob() blob.name = blob_name _update_blob_values(blob, blob_value) blobs_to_update.append(blob) else: for dict_key, dict_val in blob_value.items(): for blob in artifact.blobs: if blob.name == blob_name and blob.key_name == dict_key: _update_blob_values(blob, dict_val) blobs_to_update.append(blob) break else: blob = models.ArtifactBlob() blob.name = blob_name blob.key_name = dict_key _update_blob_values(blob, dict_val) blobs_to_update.append(blob) return blobs_to_update
[docs]def count_artifact_number(context, session, type_name=None): """Return a number of artifacts for tenant.""" query = session.query(func.count(models.Artifact.id)).filter( models.Artifact.owner == context.tenant) if type_name is not None: query = query.filter(models.Artifact.type_name == type_name) return query.order_by(None).scalar() or 0
[docs]def calculate_uploaded_data(context, session, type_name=None): """Return the amount of uploaded data for tenant.""" query = session.query( func.sum(models.ArtifactBlob.size)).join( models.Artifact, aliased=True).filter( models.Artifact.owner == context.tenant) if type_name is not None: query = query.filter(models.Artifact.type_name == type_name) return query.order_by(None).scalar() or 0
def _generate_quota_id(project_id, quota_name, type_name=None): quota_id = b"%s:%s" % (project_id.encode(), quota_name.encode()) if type_name is not None: quota_id += b":%s" % type_name.encode() return hashlib.md5(quota_id).hexdigest() @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) @utils.no_4byte_params def set_quotas(values, session): """Create new quota instances in database""" with session.begin(): for project_id, project_quotas in values.items(): # reset all project quotas session.query(models.ArtifactQuota).filter( models.ArtifactQuota.project_id == project_id).delete() # generate new quotas for quota_name, quota_value in project_quotas.items(): q = models.ArtifactQuota() q.project_id = project_id q.quota_name = quota_name q.quota_value = quota_value session.add(q) # save all quotas session.flush() @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def get_all_quotas(session, project_id=None): """List all available quotas.""" query = session.query(models.ArtifactQuota) if project_id is not None: query = query.filter( models.ArtifactQuota.project_id == project_id) quotas = query.order_by(models.ArtifactQuota.project_id).all() res = {} for quota in quotas: res.setdefault( quota.project_id, {})[quota.quota_name] = quota.quota_value return res @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) @utils.no_4byte_params def create_lock(context, lock_key, session): """Try to create lock record.""" with session.begin(): existing = session.query(models.ArtifactLock).get(lock_key) if existing is None: try: lock = models.ArtifactLock() lock.id = lock_key lock.save(session=session) return lock.id except (sqlalchemy.exc.IntegrityError, db_exception.DBDuplicateEntry): msg = _("Cannot lock an item with key %s. " "Lock already acquired by other request") % lock_key raise exception.Conflict(msg) else: if timeutils.is_older_than(existing.acquired_at, 5): existing.acquired_at = timeutils.utcnow() existing.save(session) return existing.id else: msg = _("Cannot lock an item with key %s. " "Lock already acquired by other request") % lock_key raise exception.Conflict(msg) @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def delete_lock(context, lock_id, session): with session.begin(): session.query(models.ArtifactLock).filter_by(id=lock_id).delete() @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def save_blob_data(context, blob_data_id, data, session): """Save blob data to database.""" with session.begin(): blob_data = models.ArtifactBlobData() blob_data.id = blob_data_id blob_data.data = data.read() blob_data.save(session=session) return "sql://" + blob_data.id @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def save_blob_data_batch(context, blobs, session): """Perform batch uploading to database.""" with session.begin(): locations = [] # blobs is a list of tuples (blob_data_id, data) for blob_data_id, data in blobs: blob_data = models.ArtifactBlobData() blob_data.id = blob_data_id blob_data.data = data.read() session.add(blob_data) locations.append("sql://" + blob_data.id) session.flush() return locations @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def get_blob_data(context, uri, session): """Download blob data from database.""" blob_data_id = uri[6:] try: blob_data = session.query( models.ArtifactBlobData).filter_by(id=blob_data_id).one() except orm.exc.NoResultFound: msg = _("Cannot find a blob data with id %s.") % blob_data_id raise exception.NotFound(msg) return blob_data.data @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def delete_blob_data(context, uri, session): """Delete blob data from database.""" with session.begin(): blob_data_id = uri[6:] session.query( models.ArtifactBlobData).filter_by(id=blob_data_id).delete()