Source code for georead._dump_utils

"""Utils for dumping keywords data."""

from collections.abc import Callable, Sequence
from contextlib import ExitStack
import copy
import numbers
import pathlib
from typing import Any, Protocol, cast
import numpy as np
import numpy.typing as npt
import pandas as pd
from ._data_directory import (
    INT_NAN,
    ArraySpecification,
    ArrayWithUnits,
    DataType,
    DataTypes,
    DATA_DIRECTORY,
    IntType,
    KeywordSpecification,
    NoDataSpecification,
    ObjectSpecification,
    ParametersSpecification,
    StringSpecification,
    TableSpecification,
    ValueType,
    get_dynamic_keyword_specification,
)

MAX_STRLEN = 40

INPLACE_ARRAYS = ['TSTEP']


class PWriteBuf(Protocol):
    """Protocol for writable buffer."""

    def write(self, s: str, /) -> int | None:
        """
        Write string to buffer.

        Parameters
        ----------
        s : str
            String to write.

        Returns
        -------
        int | None

        """
        pass


def format_string_val(
    val: pd.Timestamp | str,
    keyword_spec: StringSpecification | None | ObjectSpecification,
) -> str:
    """
    Format dates.

    Parameters
    ----------
    val : pd.Timestamp | str
        Value to format.
    keyword_spec : StringSpecification | None | ObjectSpecification
        Keyword specification.

    Returns
    -------
    str
        Formatted string.

    """
    if keyword_spec is not None and keyword_spec.date:
        if not isinstance(val, pd.Timestamp):
            raise ValueError('`val` should be of type pandas.Timestamp.')
        d = val.strftime('%d %b %Y').upper()
        if val.hour or val.minute or val.second:
            t = val.strftime('%H:%M:%S')
            return ' '.join((d, t))
        return d
    if not isinstance(val, str):
        raise ValueError('Value should be of type str.')
    return val


def _dump_string(keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf):
    if not (isinstance(val, str) or isinstance(val, pd.Timestamp)):
        raise ValueError('`val` should be of type `str` or `pandas.Timestamp`.')
    spec = keyword_spec.specification
    if not (isinstance(spec, StringSpecification) or spec is None):
        raise ValueError(
            '`keyword_spec.specification` should be of type StringSpecification.'
        )
    _ = buf.write('\n'.join([keyword_spec.keyword, format_string_val(val, spec), '/']))


def _dump_array(
    keyword_spec: KeywordSpecification,
    val: ValueType,
    buf: PWriteBuf,
    include_dir: pathlib.Path | None,
):
    if keyword_spec.keyword in INPLACE_ARRAYS:
        inplace = True
    else:
        inplace = False

    if not isinstance(val, np.ndarray):
        raise ValueError('`val` should be of type `numpy.ndarray`.')

    spec = keyword_spec.specification
    if not isinstance(spec, ArraySpecification):
        raise ValueError(
            '`keyword_spec.specification` should be of type ArraySpecification.'
        )

    if spec.dtype in (bool, int):
        fmt = '%d'
    else:
        fmt = '%f'
    if inplace:
        _dump_array_ascii(buf, val.reshape(-1), header=keyword_spec.keyword, fmt=fmt)
        _ = buf.write('/')
        return
    if include_dir is None:
        raise ValueError(
            '`include_dir` should be provided if array is not supposed to be dumped inplace.'
        )

    with open(include_dir / f'{keyword_spec.keyword}.inc', 'w') as inc_buf:
        _dump_array_ascii(
            inc_buf, val.reshape(-1), fmt=fmt, header=keyword_spec.keyword
        )
        _ = inc_buf.write('/')
    _ = buf.write(
        '\n'.join(
            (
                'INCLUDE',
                '"' + '/'.join((include_dir.name, f'{keyword_spec.keyword}.inc')) + '"',
            )
        )
    )
    _ = buf.write('\n/')


def _dump_table(keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf):
    if not isinstance(val, Sequence):
        raise ValueError('`val` should be of type `Sequence[pandas.DataFrame]`')

    _ = buf.write(keyword_spec.keyword)
    spec = keyword_spec.specification
    if not isinstance(spec, TableSpecification):
        raise ValueError(
            '`keyword_spec.specification` should be of type TableSpecification.'
        )

    domain = spec.domain
    for table in val:
        if spec.header:
            if not isinstance(table, Sequence):
                raise ValueError(
                    '`table` should be of type `Sequence[pandas.DataFrame]`'
                )
            header = table[1]
            data = table[0]
        else:
            header = None
            data = table
        if not isinstance(data, pd.DataFrame):
            raise ValueError('`val` should be of type `Sequence[pandas.DataFrame]`')
        domain_val = domain
        if header is not None:
            _ = buf.write('\n')

            if not isinstance(header, pd.DataFrame):
                raise ValueError('`header` should be of type `pandas.DataFrame.`')
            _dump_statement(header, buf, closing_slash=False, new_line=False)
        if domain_val is not None and len(domain_val) == 2:
            _dump_multitable(data, buf)
            continue
        _ = buf.write('\n')
        row_iterator = (
            data.itertuples() if domain is not None else data.itertuples(index=False)
        )
        for row in row_iterator:
            vals = list(row)
            vals = [_nan_to_none(v) for v in vals]  # pyright: ignore[reportAny]
            str_representaions = [
                _string_representation(v) if v is not None else '' for v in vals
            ]
            str_representaions = _replace_empty_vals(str_representaions)
            _ = buf.write('\t'.join([v for v in str_representaions]) + '\n')
        _ = buf.write('/')


def _dump_multitable(val: pd.DataFrame, buf: PWriteBuf):
    _ = buf.write('\n')
    for _, df in val.groupby(level=0):
        for i, (ind1, row) in enumerate(df.iterrows()):
            vals = row.values.tolist()
            if not isinstance(ind1, Sequence):
                raise ValueError('`val` should have multiindex.')
            if i == 0:
                vals = [*ind1] + vals
            else:
                vals = [ind1[1]] + vals
            vals = [_nan_to_none(v) for v in vals]  # pyright: ignore[reportArgumentType]
            str_representations = [
                _string_representation(v) if v is not None else '' for v in vals
            ]
            str_representations = _replace_empty_vals(str_representations)
            if i != 0:
                str_representations = [''] + str_representations
            if i == len(df) - 1:
                str_representations = str_representations + ['/']
            _ = buf.write('\t'.join(str_representations) + '\n')
    _ = buf.write('/')


def _dump_single_statement(
    keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf
):
    _ = buf.write(keyword_spec.keyword + '\n')
    if not isinstance(val, pd.DataFrame):
        raise ValueError('`val` should be of type `pandas.DataFrame`.')
    _dump_statement(val, buf, closing_slash=False)
    _ = buf.write('/')


def _dump_statement_list(
    keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf
):
    if not isinstance(val, pd.DataFrame):
        raise ValueError('`val` should be of type `pd.DataFrame`.')
    _ = buf.write(keyword_spec.keyword + '\n')
    for row in val.itertuples(index=False):
        _dump_statement(row, buf, closing_slash=True)
    _ = buf.write('/')


def _dump_records(keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf):
    _ = buf.write(keyword_spec.keyword + '\n')
    if not isinstance(val, Sequence):
        raise ValueError('`val` should be of type Sequence[pandas.DataFrame].')
    for v in val:
        if not isinstance(v, (pd.DataFrame, np.ndarray)):
            raise ValueError('`val` should be of type Sequence[pandas.DataFrame].')
        _dump_statement(v, buf, closing_slash=True)


def _dump_object_list(
    keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf
):
    _ = buf.write(keyword_spec.keyword + '\n')
    spec = keyword_spec.specification
    if not (isinstance(spec, ObjectSpecification) or spec is None):
        raise ValueError(
            '`keyword_spec.specification` should be of type `ObjectSpecification`.'
        )
    if not isinstance(val, Sequence):
        raise ValueError('`val` should be of type Sequence[str | pd.Timestamp]')

    for o in val:
        if not (isinstance(o, str) or isinstance(o, pd.Timestamp)):
            raise ValueError('`val` should be of type Sequence[str | pd.Timestamp]')
        _ = buf.write(f'{format_string_val(o, spec)}')
        if spec is not None and spec.terminated:
            _ = buf.write(' /')
        _ = buf.write('\n')
    _ = buf.write('/')


def _dump_parameters(
    keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf
):
    spec = keyword_spec.specification
    if not isinstance(spec, ParametersSpecification):
        raise ValueError(
            '`keyword_spec.specification` should be of type ParametersSpecification.'
        )
    if not isinstance(val, dict):
        raise ValueError

    if not all(isinstance(v, str) or v is None for v in val.values()):
        raise ValueError('`val` should be of type dict[str, str|None]')

    val = cast(dict[str, str | None], val)

    if spec.tabulated:
        return _dump_tabulated_parameters(keyword_spec, val, buf)

    _ = buf.write(keyword_spec.keyword + '\n')
    res = ' '.join([f'{k}' if v is None else f'{k}={v}' for k, v in val.items()])
    _ = buf.write(res)
    _ = buf.write('\n/')


def _dump_tabulated_parameters(
    keyword_spec: KeywordSpecification, val: dict[str, str | None], buf: PWriteBuf
):
    _ = buf.write(keyword_spec.keyword + '\n')
    for key, data in val.items():
        if not isinstance(data, str):
            raise ValueError('values of `val` should be strings.')
        _ = buf.write('\t'.join((key, data)) + '\n')
    _ = buf.write('/')


def _dump_array_with_units(
    keyword_spec: KeywordSpecification, val: ValueType, buf: PWriteBuf
):
    _ = buf.write(keyword_spec.keyword + '\n')
    if not isinstance(val, ArrayWithUnits):
        raise ValueError('`val` should be of type `ArrayWithUnits`')
    _ = buf.write(val.units + '\n')
    spec = keyword_spec.specification
    if not isinstance(spec, ArraySpecification):
        raise ValueError(
            '`keyword_spec.specification` should be of type `ArraySpecification`.'
        )
    if spec.dtype in (bool, int):
        fmt = '%d'
    else:
        fmt = '%g'
    _dump_array_ascii(buf, val.data.reshape(-1), fmt=fmt)
    _ = buf.write('/')


def _dump_no_data(
    keyword_spec: KeywordSpecification,
    buf: PWriteBuf,
):
    _ = buf.write(f'{keyword_spec.keyword}')
    spec = keyword_spec.specification
    if spec is not None and not isinstance(spec, NoDataSpecification):
        raise ValueError(
            '`keyword_spec.specification` should be None or of type `NoDataSpecification`.'
        )
    if spec is not None and spec.terminated:
        _ = buf.write('\n/')


DUMP_ROUTINES: dict[
    DataTypes | None,
    Callable[[KeywordSpecification, ValueType, PWriteBuf, pathlib.Path | None], None],
] = {
    DataTypes.OBJECT_LIST: lambda keyword_spec, val, buf, _: _dump_object_list(
        keyword_spec, val, buf
    ),
    DataTypes.STRING: lambda keyword_spec, val, buf, _: _dump_string(
        keyword_spec, val, buf
    ),
    DataTypes.STATEMENT_LIST: lambda keyword_spec, val, buf, _: _dump_statement_list(
        keyword_spec, val, buf
    ),
    DataTypes.PARAMETERS: lambda keyword_spec, val, buf, _: _dump_parameters(
        keyword_spec, val, buf
    ),
    DataTypes.ARRAY: _dump_array,
    DataTypes.TABLE_SET: lambda keyword_spec, val, buf, _=None: _dump_table(
        keyword_spec, val, buf
    ),
    None: lambda keyword_spec, _, buf, ___: _dump_no_data(keyword_spec, buf),
    DataTypes.SINGLE_STATEMENT: lambda keyword_spec, val, buf, _: (
        _dump_single_statement(keyword_spec, val, buf)
    ),
    DataTypes.RECORDS: lambda keyword_spec, val, buf, _: _dump_records(
        keyword_spec, val, buf
    ),
    DataTypes.ARRAY_WITH_UNITS: lambda keyword_spec, val, buf, _: (
        _dump_array_with_units(keyword_spec, val, buf)
    ),
}


def _dump_statement(
    val: pd.DataFrame | pd.Series | np.ndarray | tuple[Any, ...],  # pyright: ignore[reportExplicitAny]
    buf: PWriteBuf,
    closing_slash: bool = True,
    new_line: bool = True,
):
    if isinstance(val, pd.DataFrame):
        if val.shape[0] != 1:
            raise ValueError('Val shoud have exactly one row.')
        vals = [val[col][0] for col in val.columns]
    elif isinstance(val, pd.Series):
        vals = val.values
    else:
        vals = val
    vals = [_nan_to_none(v) for v in vals]  # pyright: ignore[reportAny]
    str_representaions = [
        _string_representation(v) if v is not None else '' for v in vals
    ]
    str_representaions = _replace_empty_vals(str_representaions)
    if len(str_representaions) == 0:
        str_representaions.append('*')
    result = '\t'.join(str_representaions)
    nl = '\n' if new_line else ''

    result += nl if not closing_slash else '/' + nl
    _ = buf.write(result)


def _string_representation(v: Any):  # pyright: ignore[reportAny, reportExplicitAny]
    if v is None:
        return ''
    if isinstance(v, numbers.Number):
        r = str(v)
        if 'e' in r:
            assert isinstance(v, (float, np.floating))
            return np.format_float_scientific(
                v, unique=True, trim='-', exp_digits=1
            ).upper()
        return r
    if isinstance(v, str):
        if any(symbol in v for symbol in ' \t'):
            return "'" + v + "'"
    return str(v)


def _replace_empty_vals(vals: Sequence[str]) -> list[str]:
    vals = copy.copy(vals)
    vals = list(vals)
    while True:
        start = None
        end = None
        for i, s in enumerate(vals):
            if s != '':
                if start is not None:
                    break
            else:
                if start is None:
                    start = i
                    end = i
                else:
                    end = i
        if start is None:
            break
        else:
            assert end is not None
            if end == len(vals) - 1:
                del vals[start:]
                continue
            if start == end:
                replacement = '*'
            else:
                replacement = f'{end - start + 1}*'
            vals[start : end + 1] = [replacement]
    return vals


def _nan_to_none(val: IntType | np.floating | str):
    if isinstance(val, (numbers.Number)) and np.isnan(val):
        return None
    if val == INT_NAN:
        return None
    if val == '':
        return None
    return val


[docs] def dump( data: DataType, path: pathlib.Path, inplace_schedule: bool = False, filename: str | None = None, ) -> None: """ Dump model data. Parameters ---------- data : DataType Model data. path : pathlib.Path Path to dump the model. inplace_schedule : bool, default False. Should schedule be dumped inplace. filename : str | None, default None. Name of the main model file, if None filename is taken from `TITLE` field in the `RUNSPEC`. """ if not path.exists(): path.mkdir() include_dir = path / 'include' if not include_dir.exists(): include_dir.mkdir() if filename is None: for key, val in data['RUNSPEC']: if key == 'TITLE': filename = f'{val}.data' if filename is None: raise ValueError( 'Filename is not specified and no TITLE keyword in RUNSPEC section.' ) with ExitStack() as stack: buf = stack.enter_context(open(path / filename, 'w')) for section in ( '', 'RUNSPEC', 'GRID', 'EDIT', 'PROPS', 'REGIONS', 'SOLUTION', 'SUMMARY', 'SCHEDULE', ): if section in data: if section != '': _ = buf.write(f'{section}\n\n') if section == 'SCHEDULE' and not inplace_schedule: schedule_path = include_dir / 'schedule.inc' _ = buf.write('INCLUDE\n') _ = buf.write(('"' + str(schedule_path.relative_to(path))) + '"') _ = buf.write('\n/\n\n') buf_tmp = stack.enter_context(open(schedule_path, 'w')) else: buf_tmp = buf for key, val in data[section]: spec = DATA_DIRECTORY[key] if spec is None: spec = get_dynamic_keyword_specification(key, data) DUMP_ROUTINES[spec.type](spec, val, buf_tmp, include_dir) _ = buf_tmp.write('\n\n')
def _dump_array_ascii( buffer: PWriteBuf, array: npt.NDArray[np.floating | np.integer | np.bool_], header: str | None = None, fmt: str = '%f', compressed: bool = True, ): """ Write array-like data into an ASCII buffer. Parameters ---------- buffer : PWriteBuf Destination buffer. array : 1d, array-like Array to be saved. header : str, optional String to be written line before the array. fmt : str or sequence of strs, optional Format to be passed into ``numpy.savetxt`` function. Default to '%f'. compressed : bool If True, uses compressed typing style """ if header is not None: _ = buffer.write(header + '\n') if compressed: i = 0 items_written = 0 while i < len(array): count = 1 while (i + count < len(array)) and (array[i + count] == array[i]): count += 1 if count <= 4: _ = buffer.write(' '.join([fmt % array[i]] * count)) items_written += count else: _ = buffer.write(''.join((str(count), '*', fmt % array[i]))) items_written += 1 i += count if items_written > MAX_STRLEN: _ = buffer.write('\n') items_written = 0 elif i < len(array): _ = buffer.write(' ') _ = buffer.write('\n') else: for i in range(0, len(array), MAX_STRLEN): _ = buffer.write(' '.join([fmt % d for d in array[i : i + MAX_STRLEN]])) # pyright: ignore[reportAny] _ = buffer.write('\n') _ = buffer.write('\n')