# -*- coding: utf-8 -*-
"""Parameter Value Language decoder.
The definition of PVL used in this module is based on the Consultive
Committee for Space Data Systems, and their Parameter Value
Language Specification (CCSD0006 and CCSD0008), CCSDS 6441.0-B-2,
referred to as the Blue Book with a date of June 2000.
A decoder deals with converting strings given to it (typically
by the parser) to the appropriate Python type.
"""
# Copyright 2015, 2017, 2019-2021, ``pvl`` library authors.
#
# Reuse is permitted under the terms of the license.
# The AUTHORS file and the LICENSE file are at the
# top level of this library.
import re
from datetime import datetime, timedelta, timezone
from decimal import InvalidOperation
from itertools import repeat, chain
from warnings import warn
from .grammar import PVLGrammar, ODLGrammar, PDSGrammar
from .collections import Quantity
from .exceptions import QuantityError
[docs]def for_try_except(exception, function, *iterable):
"""Return the result of the first successful application of *function*
to an element of *iterable*. If the *function* raises an Exception
of type *exception*, it will continue to the next item of *iterable*.
If there are no successful applications an Exception of type
*exception* will be raised.
If additional *iterable* arguments are passed, *function* must
take that many arguments and is applied to the items from
all iterables in parallel (like ``map()``). With multiple iterables,
the iterator stops when the shortest iterable is exhausted.
"""
for tup in zip(*iterable):
try:
return function(*tup)
except exception:
pass
raise exception
[docs]class PVLDecoder(object):
"""A decoder based on the rules in the CCSDS-641.0-B-2 'Blue Book'
which defines the PVL language.
:param grammar: defaults to a :class:`pvl.grammar.PVLGrammar`, but can
be any object that implements the :class:`pvl.grammar` interface.
:param quantity_cls: defaults to :class:`pvl.collections.Quantity`, but
could be any class object that takes two arguments, where the
first is the value, and the second is the units value.
:param real_cls: defaults to :class:`float`, but could be any class object
that can be constructed from a `str` object.
"""
def __init__(self, grammar=None, quantity_cls=None, real_cls=None):
self.errors = []
if grammar is None:
self.grammar = PVLGrammar()
elif isinstance(grammar, PVLGrammar):
self.grammar = grammar
else:
raise Exception
if quantity_cls is None:
self.quantity_cls = Quantity
else:
self.quantity_cls = quantity_cls
if real_cls is None:
self.real_cls = float
else:
self.real_cls = real_cls
[docs] def decode(self, value: str):
"""Returns a Python object based on *value*."""
return self.decode_simple_value(value)
[docs] def decode_simple_value(self, value: str):
"""Returns a Python object based on *value*, assuming
that *value* can be decoded as a PVL Simple Value::
<Simple-Value> ::= (<Date-Time> | <Numeric> | <String>)
"""
if value.casefold() == self.grammar.none_keyword.casefold():
return None
if value.casefold() == self.grammar.true_keyword.casefold():
return True
if value.casefold() == self.grammar.false_keyword.casefold():
return False
for d in (
self.decode_quoted_string,
self.decode_non_decimal,
self.decode_decimal,
self.decode_datetime,
):
try:
return d(value)
except ValueError:
pass
return self.decode_unquoted_string(value)
[docs] def decode_unquoted_string(self, value: str) -> str:
"""Returns a Python ``str`` if *value* can be decoded
as an unquoted string, based on this decoder's grammar.
Raises a ValueError otherwise.
"""
for coll in (
("a comment", chain.from_iterable(self.grammar.comments)),
("some whitespace", self.grammar.whitespace),
("a special character", self.grammar.reserved_characters),
):
for item in coll[1]:
if item in value:
raise ValueError(
"Expected a Simple Value, but encountered "
f'{coll[0]} in "{self}": "{item}".'
)
agg_keywords = self.grammar.aggregation_keywords.items()
for kw in chain.from_iterable(agg_keywords):
if kw.casefold() == value.casefold():
raise ValueError(
"Expected a Simple Value, but encountered "
f'an aggregation keyword: "{value}".'
)
for es in self.grammar.end_statements:
if es.casefold() == value.casefold():
raise ValueError(
"Expected a Simple Value, but encountered "
f'an End-Statement: "{value}".'
)
# This try block is going to look illogical. But the decode
# rules for Unquoted Strings spell out the things that they
# cannot be, so if it *can* be a datetime, then it *can't* be
# an Unquoted String, which is why we raise if it succeeds,
# and pass if it fails:
try:
self.decode_datetime(value)
raise ValueError
except ValueError:
pass
return str(value)
[docs] def decode_quoted_string(self, value: str) -> str:
"""Returns a Python ``str`` if *value* begins and ends
with matching quote characters based on this decoder's
grammar. Raises ValueError otherwise.
"""
for q in self.grammar.quotes:
if value.startswith(q) and value.endswith(q) and len(value) > 1:
return str(value[1:-1])
raise ValueError(f'The object "{value}" is not a PVL Quoted String.')
[docs] def decode_decimal(self, value: str):
"""Returns a Python ``int`` or ``self.real_cls`` object, as appropriate
based on *value*. Raises a ValueError otherwise.
"""
# Returns int or real_cls
try:
return int(value, base=10)
except ValueError:
try:
return self.real_cls(str(value))
except InvalidOperation as err:
raise ValueError from err
[docs] def decode_non_decimal(self, value: str) -> int:
"""Returns a Python ``int`` as decoded from *value*
on the assumption that *value* conforms to a
non-decimal integer value as defined by this decoder's
grammar, raises ValueError otherwise.
"""
# Non-Decimal (Binary, Hex, and Octal)
for nd_re in (
self.grammar.binary_re,
self.grammar.octal_re,
self.grammar.hex_re,
):
match = nd_re.fullmatch(value)
if match is not None:
d = match.groupdict("")
return int(d["sign"] + d["non_decimal"], base=int(d["radix"]))
raise ValueError
[docs] def decode_datetime(self, value: str): # noqa: C901
"""Takes a string and attempts to convert it to the appropriate
Python ``datetime`` ``time``, ``date``, or ``datetime``
type based on this decoder's grammar, or in one case, a ``str``.
The PVL standard allows for the seconds value to range
from zero to 60, so that the 60 can accommodate leap
seconds. However, the Python ``datetime`` classes don't
support second values for more than 59 seconds.
If a time with 60 seconds is encountered, it will not be
returned as a datetime object (since that is not representable
via Python datetime objects), but simply as a string.
The user can then then try and use the ``time`` module
to parse this string into a ``time.struct_time``. We
chose not to do this with pvl because ``time.struct_time``
is a full *datetime* like object, even if it parsed
only a *time* like object, the year, month, and day
values in the ``time.struct_time`` would default, which
could be misleading.
Alternately, the pvl.grammar.PVLGrammar class contains
two regexes: ``leap_second_Ymd_re`` and ``leap_second_Yj_re``
which could be used along with the ``re.match`` object's
``groupdict()`` function to extract the string representations
of the various numerical values, cast them to the appropriate
numerical types, and do something useful with them.
"""
try:
# datetime.date objects will always be naive, so just return:
return for_try_except(
ValueError,
datetime.strptime,
repeat(value),
self.grammar.date_formats,
).date()
except ValueError:
# datetime.time and datetime.datetime might be either:
d = None
try:
d = for_try_except(
ValueError,
datetime.strptime,
repeat(value),
self.grammar.time_formats,
).time()
except ValueError:
try:
d = for_try_except(
ValueError,
datetime.strptime,
repeat(value),
self.grammar.datetime_formats,
)
except ValueError:
pass
if d is not None:
if d.utcoffset() is None:
if value.endswith("Z"):
return d.replace(tzinfo=timezone.utc)
elif self.grammar.default_timezone is not None:
return d.replace(tzinfo=self.grammar.default_timezone)
return d
# if we can regex a 60-second time, return str
if self.is_leap_seconds(value):
return str(value)
else:
raise ValueError
[docs] def is_leap_seconds(self, value: str) -> bool:
"""Returns True if *value* is a time that matches the
grammar's definition of a leap seconds time (a time string with
a value of 60 for the seconds value). False otherwise."""
for r in (
self.grammar.leap_second_Ymd_re,
self.grammar.leap_second_Yj_re,
):
if r is not None and r.fullmatch(value) is not None:
return True
else:
return False
[docs] def decode_quantity(self, value, unit):
"""Returns a Python object that represents a value with
an associated unit, based on the values provided via
*value* and *unit*. This function creates an object
based on the decoder's *quantity_cls*.
"""
try:
return self.quantity_cls(value, str(unit))
except ValueError as err:
raise QuantityError(err)
[docs]class ODLDecoder(PVLDecoder):
"""A decoder based on the rules in the PDS3 Standards Reference
(version 3.8, 27 Feb 2009) Chapter 12: Object Description
Language Specification and Usage.
Extends PVLDecoder, and if *grammar* is not specified, it will
default to an ODLGrammar() object.
"""
def __init__(self, grammar=None, quantity_cls=None, real_cls=None):
self.errors = []
if grammar is None:
grammar = ODLGrammar()
super().__init__(
grammar=grammar,
quantity_cls=quantity_cls,
real_cls=real_cls
)
[docs] def decode_datetime(self, value: str):
"""Extends parent function to also deal with datetimes
and times with a time zone offset.
If it cannot, it will raise a ValueError.
"""
try:
return super().decode_datetime(value)
except ValueError:
# if there is a +HH:MM or a -HH:MM suffix that
# can be stripped, then we're in business.
# Otherwise ...
match = re.fullmatch(
r"(?P<dt>.+?)" # the part before the sign
r"(?P<sign>[+-])" # required sign
r"(?P<hour>0?[0-9]|1[0-2])" # 0 to 12
fr"(?:{self.grammar._M_frag})?", # Minutes
value,
)
if match is not None:
gd = match.groupdict(default=0)
dt = super().decode_datetime(gd["dt"])
offset = timedelta(
hours=int(gd["hour"]), minutes=int(gd["minute"])
)
if gd["sign"] == "-":
offset = -1 * offset
return dt.replace(tzinfo=timezone(offset))
raise ValueError
[docs] def decode_non_decimal(self, value: str) -> int:
"""Extends parent function by allowing the wider variety of
radix values that ODL permits over PVL.
"""
match = self.grammar.nondecimal_re.fullmatch(value)
if match is not None:
d = match.groupdict("")
return int(d["sign"] + d["non_decimal"], base=int(d["radix"]))
raise ValueError
[docs] def decode_quoted_string(self, value: str) -> str:
"""Extends parent function because the
ODL specification allows for a dash (-) line continuation
character that results in the dash, the line end, and any
leading whitespace on the next line to be removed. It also
allows for a sequence of format effectors surrounded by
spacing characters to be collapsed to a single space.
"""
s = super().decode_quoted_string(value)
# Deal with dash (-) continuation:
# sp = ''.join(self.grammar.spacing_characters)
fe = "".join(self.grammar.format_effectors)
ws = "".join(self.grammar.whitespace)
nodash = re.sub(fr"-[{fe}][{ws}]*", "", s)
# Originally thought that only format effectors surrounded
# by whitespace was to be collapsed
# foo = re.sub(fr'[{sp}]*[{fe}]+[{sp}]*', ' ', nodash)
# But really it collapses all whitespace and strips lead and trail.
return re.sub(fr"[{ws}]+", " ", nodash.strip(ws))
[docs] def decode_unquoted_string(self, value: str) -> str:
"""Extends parent function to provide the extra enforcement that only
ODL Identifier text may be unquoted as a value.
"""
s = super().decode_unquoted_string(value)
if self.is_identifier(s):
return s
else:
raise ValueError(
f"Only text that qualifies as an ODL Identifier may be "
f"unquoted as a value, and '{s}' is not."
)
[docs] @staticmethod
def is_identifier(value):
"""Returns true if *value* is an ODL Identifier, false otherwise.
An ODL Identifier is composed of letters, digits, and underscores.
The first character must be a letter, and the last must not
be an underscore.
"""
if isinstance(value, str):
if len(value) == 0:
return False
try:
# Ensure we're dealing with ASCII
value.encode(encoding="ascii")
# value can't start with a letter or end with an underbar
if not value[0].isalpha() or value.endswith("_"):
return False
for c in value:
if not (c.isalpha() or c.isdigit() or c == "_"):
return False
else:
return True
except UnicodeError:
return False
else:
return False
[docs]class PDSLabelDecoder(ODLDecoder):
"""A decoder based on the rules in the PDS3 Standards Reference
(version 3.8, 27 Feb 2009) Chapter 12: Object Description
Language Specification and Usage.
Extends ODLDecoder, and if *grammar* is not specified, it will
default to a PDS3Grammar() object.
"""
def __init__(self, grammar=None, quantity_cls=None):
self.errors = []
if grammar is None:
super().__init__(grammar=PDSGrammar(), quantity_cls=quantity_cls)
else:
super().__init__(grammar=grammar, quantity_cls=quantity_cls)
[docs] def decode_datetime(self, value: str):
"""Overrides parent function since PDS3 forbids a timezone
specification, and times with a precision more than miliseconds.
If it cannot decode properly, it will raise a ValueError.
"""
t = super(ODLDecoder, self).decode_datetime(value)
if (
hasattr(t, "microsecond")
and t.microsecond != round(t.microsecond / 1000) * 1000
):
raise ValueError(
f"The PDS specification does not allow time values with"
f"precision greater than miliseconds, and this has "
f"microsecond precision: {t}."
)
return t
[docs]class OmniDecoder(ODLDecoder):
"""A permissive decoder that attempts to parse all forms of
"PVL" that are thrown at it.
Extends ODLDecoder.
"""
[docs] def decode_non_decimal(self, value: str) -> int:
"""Extends parent function by allowing a plus or
minus sign to be in two different positions
in a non-decimal number, since PVL has one
specification, and ODL has another.
"""
# Non-Decimal with a variety of radix values and sign
# positions.
match = self.grammar.nondecimal_re.fullmatch(value)
if match is not None:
d = match.groupdict("")
if "second_sign" in d:
if d["sign"] != "" and d["second_sign"] != "":
raise ValueError(
f'The non-decimal value, "{value}", ' "has two signs."
)
elif d["sign"] != "":
sign = d["sign"]
else:
sign = d["second_sign"]
else:
sign = d["sign"]
return int(sign + d["non_decimal"], base=int(d["radix"]))
raise ValueError
[docs] def decode_datetime(self, value: str):
"""Returns an appropriate Python datetime time, date, or datetime
object by using the 3rd party dateutil library (if present)
to parse an ISO 8601 datetime string in *value*. If it cannot,
or the dateutil library is not present, it will raise a
ValueError.
"""
try:
return super().decode_datetime(value)
except ValueError:
try:
from dateutil.parser import isoparser
isop = isoparser()
if len(value) > 3 and value[-2] == "+" and value[-1].isdigit():
# This technically means that we accept slightly more
# formats than ISO 8601 strings, since under that
# specification, two digits after the '+' are required
# for an hour offset, but ODL doesn't have this
# requirement. If we find only one digit, we'll
# just assume it means an hour and insert a zero so
# that it can be parsed.
tokens = value.rpartition("+")
value = tokens[0] + "+0" + tokens[-1]
try:
return isop.parse_isodate(value)
except ValueError:
try:
return isop.parse_isotime(value)
except ValueError:
return isop.isoparse(value)
except ImportError:
warn(
"The dateutil library is not present, so more "
"exotic date and time formats beyond the PVL/ODL "
"set cannot be parsed.",
ImportWarning,
)
raise ValueError
[docs] def decode_unquoted_string(self, value: str) -> str:
"""Overrides parent function since the ODLDecoder has a more narrow
definition of what is allowable as an unquoted string than the
PVLDecoder does.
"""
return super(ODLDecoder, self).decode_unquoted_string(value)