Source code for marshmallow_utils.schemas.identifier
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
#
# Marshmallow-Utils is free software; you can redistribute it and/or modify
# it under the terms of the MIT License; see LICENSE file for more details.
"""Persistent identifier Schema with automatic scheme detection.
Integrates idutils library to detect the scheme.
"""
import idutils
from marshmallow import Schema, ValidationError, post_load, pre_load, validates_schema
from ..fields import SanitizedUnicode
[docs]class IdentifierSchema(Schema):
"""Identifier with automatic scheme detection."""
identifier = SanitizedUnicode()
scheme = SanitizedUnicode()
error_messages = {
"unknown_scheme": "No valid scheme recognized for identifier.",
"invalid_identifier": "Invalid {scheme} identifier.",
"invalid_scheme": "Invalid scheme.",
"required": "Missing data for required field.",
}
def __init__(self, allowed_schemes, identifier_required=True, **kwargs):
"""Constructor.
:param allowed_schemes: a dictionary of allowed schemes. Each key must
contain a validator function and a scheme label.
:param identifier_required: True when the identifier value is required.
"""
self.identifier_required = identifier_required
self.allowed_schemes = allowed_schemes
super().__init__(**kwargs)
def _intersect_with_order(self, detected_schemes):
"""Returns the first detected scheme that is allowed."""
allowed_schemes = set(self.allowed_schemes.keys())
for detected in detected_schemes:
if detected in allowed_schemes:
return detected
return None
[docs] @pre_load(pass_many=False)
def load_scheme(self, data, **kwargs):
"""Loads the scheme of the identifier."""
# If no identifier provided, proceed to validation
identifier = data.get("identifier")
if not identifier:
return data
# If identifier and scheme is provided, proceed to validation.
scheme = data.get("scheme")
if scheme:
return data
# If identifier but no scheme is provided, try to detect scheme
detected_schemes = idutils.detect_identifier_schemes(identifier)
# Select a valid scheme from the detected ones.
detected_scheme = self._intersect_with_order(detected_schemes)
if detected_scheme:
data["scheme"] = detected_scheme
return data
[docs] @validates_schema
def validate_identifier(self, data, **kwargs):
"""Validate the identifier format and scheme."""
identifier = data.get("identifier")
scheme = data.get("scheme")
# Bail if identifier is not required and identifier/scheme is not
# provided
if not self.identifier_required and not identifier and not scheme:
return
errors = dict()
# Validate scheme
if not scheme and identifier:
errors["scheme"] = self.error_messages["unknown_scheme"]
elif not scheme:
errors["scheme"] = self.error_messages["required"]
elif scheme not in self.allowed_schemes:
errors["scheme"] = self.error_messages["invalid_scheme"]
# Validate identifier
if not identifier:
errors["identifier"] = self.error_messages["required"]
elif scheme and scheme in self.allowed_schemes:
validator = self.allowed_schemes[scheme]["validator"]
if not validator(identifier):
message = self.error_messages["invalid_identifier"]
scheme_label = self.allowed_schemes[scheme].get("label", scheme)
errors["identifier"] = message.format(scheme=scheme_label)
if errors:
raise ValidationError(errors)
[docs] @post_load
def normalize_identifier(self, data, **kwargs):
"""Normalizes the identifier based on the scheme."""
identifier = data.get("identifier")
# It can be empty if not required
if identifier:
# at this point, `scheme` is set or validation failed earlier
scheme = data["scheme"]
# will return the same value if not able to normalize by idutils
data["identifier"] = idutils.normalize_pid(identifier, scheme)
return data