Module snowpat.snowpackreader.json_converter

Expand source code
from typing import Dict, Optional, Tuple
import datetime
import json
import numpy as np
from .Snowpack import Snowpack
import warnings
import math

def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) -> Tuple[Snowpack, Dict, datetime.datetime]:
    """
    Read a snowpack profile from a JSON file and convert it to a Snowpack object.
    
    Args:
        json_file (str): Path to the JSON file containing the snowpack profile
        max_layer_size (Optional[float]): Maximum allowed layer size. Layers exceeding this will be split.
        
    Returns:
        Tuple[Snowpack, Dict, datetime.datetime]: A tuple containing:
            - Snowpack object with the profile data
            - Dictionary with station metadata
            - Profile date
    """
    with open(json_file, 'r') as f:
        data = json.load(f)
    
    # Extract station metadata
    metadata = {
        "StationName": data["name"],
        "Latitude": data["position"]["latitude"],
        "Longitude": data["position"]["longitude"],
        "Altitude": data["position"]["altitude"],
        "SlopeAngle": data["position"]["angle"],
        "SlopeAzi": data["position"]["_azimuth"]
    }
    
    # Create Snowpack object
    snowpack = Snowpack()
    
    # Get the first profile (assuming single date)
    profile = data["profiles"][0]
    
    # Get profile date
    profile_date = datetime.datetime.fromisoformat(profile["date"])
    
    # Process layer data
    layers = []
    
    # Extract layers from hardness data (which contains the layer structure)
    hardness_data = profile["hardness"]["elements"][0]["layers"]
    
    # Sort layers by bottom height to ensure correct ordering
    hardness_data.sort(key=lambda x: x["bottom"])
    
    duplicate_layers = []
        
    # Convert layer boundaries to format needed by Snowpack
    boundaries = []
    for layer in hardness_data:
        if not boundaries:  # First layer
            boundaries.append(layer["bottom"])
        boundaries.append(layer["top"])
    
    # Convert hardness values to Newton scale if needed (assuming values > 6 indicate Newton scale)
    hardness_values = []
    is_newton = any(layer["value"] > 6 for layer in hardness_data)
    snowpack.isNewton = is_newton
    for layer in hardness_data:
        hardness_values.append(layer["value"])
    
    n_duplications = []
    if max_layer_size is not None:
        new_boundaries = []
        for i in range(len(boundaries) - 1):
            layer_size = boundaries[i+1] - boundaries[i]
            if layer_size > max_layer_size:
                # Calculate splits using numpy for better numerical precision
                n_splits = math.ceil(layer_size / max_layer_size)
                split_points = np.linspace(boundaries[i], boundaries[i+1], n_splits + 1, dtype=np.float64)
                
                # Add all points except the last one (it will be added in the next iteration)
                new_boundaries.extend(split_points[:-1])
            else:
                n_splits = 1
                new_boundaries.append(boundaries[i])
            n_duplications.append(n_splits)
            
        # Add final boundary
        new_boundaries.append(boundaries[-1])
        
        # Convert to numpy arrays
        boundaries = np.array(new_boundaries, dtype=np.float64)
    else:
        boundaries = np.array(boundaries, dtype=np.float64)
    
    hardness_values = np.array(hardness_values, dtype=np.float64)
    if n_duplications:
        assert len(n_duplications) == len(hardness_values), "Number of duplications does not match number of hardness values"
        hardness_values = np.repeat(hardness_values, n_duplications)
    # Set layer boundaries and hardness
    snowpack.set_param("0501", boundaries, len(boundaries))
    snowpack.set_param("0534", hardness_values, len(hardness_values))

    # make new fields, so it is clearer, where the layers actually are
    layer_middle = [
        (snowpack.layer_boundaries[i + 1] + snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]
    layer_thicknes = [
        (snowpack.layer_boundaries[i + 1] - snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]

    layer_middle = np.array(layer_middle)
    layer_thicknes = np.array(layer_thicknes)

    snowpack.data["layer middle"] = layer_middle
    snowpack.data["layer thickness"] = layer_thicknes
    if len(snowpack._height_mask) > len(layer_middle):
        snowpack._height_mask = np.delete(snowpack._height_mask, -1)

    # Process temperature data - the only property that should be interpolated
    if "temperature" in profile:
        temp_data = profile["temperature"]["elements"][0]["layers"]
        temp_values = []
        temp_pos = []
        for layer in temp_data:
            temp_values.append(layer["value"])
            temp_pos.append(layer["bottom"])
        
        # Convert to numpy arrays for efficient interpolation
        temp_pos = np.array(temp_pos, dtype=np.float64)
        temp_values = np.array(temp_values, dtype=np.float64)
        
        # Interpolate temperatures to layer midpoints
        temp_values = np.interp(layer_middle, temp_pos, temp_values)
        snowpack.set_param("0503", temp_values, len(temp_values))

    # Process density data - replicate values for split layers
    if "density" in profile:
        density_data = profile["density"]["elements"][0]["layers"]
        density_top = []
        density_bottom = []
        density_values = []
        for layer in density_data:
            if "value" in layer and layer["value"] is not None:
                density_values.append(layer["value"])
                density_top.append(layer["top"])
                density_bottom.append(layer["bottom"])
            else:
                density_values.append(0.0)  # Default value for missing data

        if density_values:
            # Convert to numpy array
            density_values = np.array(density_values, dtype=np.float64)
            density_top = np.array(density_top, dtype=np.float64)
            density_bottom = np.array(density_bottom, dtype=np.float64)
            density_boundaries = np.unique(np.concatenate([density_top, density_bottom]))
            density_boundaries.sort()
            if len(density_values) != len(layer_middle) or not all(density_boundaries == snowpack.layer_boundaries):
                print("Computing Mean densities for layers, provided density layers do not match layer boundaries")
                new_density_values = np.zeros(len(layer_middle))
                last_density_pos = 0
                for i in range(len(layer_middle)):
                    midpoint = layer_middle[i]
                    thickness = layer_thicknes[i]
                    layer_bottom = midpoint - thickness / 2
                    layer_top = midpoint + thickness / 2
                    # Weighted mean: compute overlap of density intervals with this layer
                    total_weight = 0.0
                    weighted_sum = 0.0
                    d_top = density_top[last_density_pos]
                    d_bottom = density_bottom[last_density_pos]
                    if d_top < layer_bottom:
                        last_density_pos = np.where(density_top >= layer_bottom)[0][0]
                        if not (last_density_pos > 0):
                            raise ValueError("Could not match density layers with layer boundaries")
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    if d_bottom > layer_top:
                        continue
                    
                    while d_bottom < layer_top:
                        layer_overlap = d_top - d_bottom if d_top < layer_top else layer_top - d_bottom
                        total_weight += layer_overlap
                        weighted_sum += layer_overlap * density_values[last_density_pos]
                        d_bottom = d_top
                        if d_bottom > layer_top:
                            break
                        last_density_pos += 1
                        if last_density_pos >= len(density_values):
                            warnings.warn("Not enough density layers to match with layer boundaries")
                            break
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    
                    if total_weight > 0:
                        new_density_values[i] = weighted_sum / total_weight
                    else:
                        new_density_values[i] = 0.0
                density_values = new_density_values
            elif n_duplications:
                assert len(density_values) == len(n_duplications), "Density values and duplications do not match, something went wrong when assigning layer splits"
                density_values = np.repeat(density_values, n_duplications)
            snowpack.set_param("0502", density_values, len(density_values))

    # Process grain shape data - replicate values for split layers
    if "grainshape" in profile:
        grain_data = profile["grainshape"]["elements"][0]["layers"]
        grain_codes = []
        for layer in grain_data:
            # Convert grain shape codes to Swiss Code format
            if isinstance(layer["value"], dict):
                primary = layer["value"]["primary"]
                secondary = layer["value"].get("secondary", "")
                tertiary_code = 0
                # Special case for MFcr
                if primary == "MFcr":
                    primary_code = 7
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    tertiary_code = 2
                else:
                    primary_code = TYPES_TO_CODE.get(primary, -1)
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    if primary_code == -1:
                        primary_code = -9
                        secondary_code = 9
                        tertiary_code = 9
                code = int(f"{primary_code}{secondary_code}{tertiary_code}")
            else:
                code = TYPES_TO_CODE.get(layer["value"], -999)
            
            grain_codes.append(code)
        
        if grain_codes:
            # Convert to numpy array
            grain_codes = np.array(grain_codes, dtype=np.int16)
            if n_duplications:
                assert len(grain_codes) == len(n_duplications), "Grain codes and duplications do not match, something went wrong when assigning layer splits"
                grain_codes = np.repeat(grain_codes, n_duplications)
            snowpack.set_param("0513", grain_codes, len(grain_codes))

    # Process grain size data - replicate values for split layers
    if "grainsize" in profile:
        size_data = profile["grainsize"]["elements"][0]["layers"]
        size_values = []
        for layer in size_data:
            if "value" in layer and layer["value"] is not None:
                size_values.append(layer["value"]["avg"])
            else:
                size_values.append(0.0)  # Default value for missing data
        if size_values:
            # Convert to numpy array
            size_values = np.array(size_values, dtype=np.float64)
            if n_duplications:
                assert len(size_values) == len(n_duplications), "Grain sizes and duplications do not match, something went wrong when assigning layer splits"
                size_values = np.repeat(size_values, n_duplications)
            snowpack.set_param("0512", size_values, len(size_values))

    # Process wetness data - replicate values for split layers
    if "wetness" in profile:
        wetness_data = profile["wetness"]["elements"][0]["layers"]
        wetness_values = []
        for layer in wetness_data:
            if "value" in layer and layer["value"] is not None:
                wetness_values.append(layer["value"])
            else:
                wetness_values.append(0.0)  # Default value for missing data
        if wetness_values:
            # Convert to numpy array
            wetness_values = np.array(wetness_values, dtype=np.float64)
            if n_duplications:
                assert len(wetness_values) == len(n_duplications), "Wetness values and duplications do not match, something went wrong when assigning layer splits"
                wetness_values = np.repeat(wetness_values, n_duplications)
            snowpack.set_param("0506", wetness_values, len(wetness_values))

    # stability index  

    
    snowpack._parsed = True
    
    # Calculate stability indices
    try:
        print("Calculating stability indices...")
        stability_indices = snowpack.calculate_stability_indices()
        snowpack.weak_layer = stability_indices
    except Exception as e:
        warnings.warn(f"Could not calculate stability indices: {str(e)}")
    
    return snowpack, metadata, profile_date

TYPES_TO_CODE: Dict[str, int] = {
    "PPgp": 0,
    "PP" : 1,
    "DF" : 2,
    "RG" : 3,
    "FC" : 4,
    "DH" : 5,
    "SH" : 6,
    "MF" : 7,
    "IF" : 8,
    "FCxr" : 9}

Functions

def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) ‑> Tuple[snowpat.snowpackreader.Snowpack.Snowpack, Dict, datetime.datetime]

Read a snowpack profile from a JSON file and convert it to a Snowpack object.

Args

json_file : str
Path to the JSON file containing the snowpack profile
max_layer_size : Optional[float]
Maximum allowed layer size. Layers exceeding this will be split.

Returns

Tuple[Snowpack, Dict, datetime.datetime]
A tuple containing: - Snowpack object with the profile data - Dictionary with station metadata - Profile date
Expand source code
def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) -> Tuple[Snowpack, Dict, datetime.datetime]:
    """
    Read a snowpack profile from a JSON file and convert it to a Snowpack object.
    
    Args:
        json_file (str): Path to the JSON file containing the snowpack profile
        max_layer_size (Optional[float]): Maximum allowed layer size. Layers exceeding this will be split.
        
    Returns:
        Tuple[Snowpack, Dict, datetime.datetime]: A tuple containing:
            - Snowpack object with the profile data
            - Dictionary with station metadata
            - Profile date
    """
    with open(json_file, 'r') as f:
        data = json.load(f)
    
    # Extract station metadata
    metadata = {
        "StationName": data["name"],
        "Latitude": data["position"]["latitude"],
        "Longitude": data["position"]["longitude"],
        "Altitude": data["position"]["altitude"],
        "SlopeAngle": data["position"]["angle"],
        "SlopeAzi": data["position"]["_azimuth"]
    }
    
    # Create Snowpack object
    snowpack = Snowpack()
    
    # Get the first profile (assuming single date)
    profile = data["profiles"][0]
    
    # Get profile date
    profile_date = datetime.datetime.fromisoformat(profile["date"])
    
    # Process layer data
    layers = []
    
    # Extract layers from hardness data (which contains the layer structure)
    hardness_data = profile["hardness"]["elements"][0]["layers"]
    
    # Sort layers by bottom height to ensure correct ordering
    hardness_data.sort(key=lambda x: x["bottom"])
    
    duplicate_layers = []
        
    # Convert layer boundaries to format needed by Snowpack
    boundaries = []
    for layer in hardness_data:
        if not boundaries:  # First layer
            boundaries.append(layer["bottom"])
        boundaries.append(layer["top"])
    
    # Convert hardness values to Newton scale if needed (assuming values > 6 indicate Newton scale)
    hardness_values = []
    is_newton = any(layer["value"] > 6 for layer in hardness_data)
    snowpack.isNewton = is_newton
    for layer in hardness_data:
        hardness_values.append(layer["value"])
    
    n_duplications = []
    if max_layer_size is not None:
        new_boundaries = []
        for i in range(len(boundaries) - 1):
            layer_size = boundaries[i+1] - boundaries[i]
            if layer_size > max_layer_size:
                # Calculate splits using numpy for better numerical precision
                n_splits = math.ceil(layer_size / max_layer_size)
                split_points = np.linspace(boundaries[i], boundaries[i+1], n_splits + 1, dtype=np.float64)
                
                # Add all points except the last one (it will be added in the next iteration)
                new_boundaries.extend(split_points[:-1])
            else:
                n_splits = 1
                new_boundaries.append(boundaries[i])
            n_duplications.append(n_splits)
            
        # Add final boundary
        new_boundaries.append(boundaries[-1])
        
        # Convert to numpy arrays
        boundaries = np.array(new_boundaries, dtype=np.float64)
    else:
        boundaries = np.array(boundaries, dtype=np.float64)
    
    hardness_values = np.array(hardness_values, dtype=np.float64)
    if n_duplications:
        assert len(n_duplications) == len(hardness_values), "Number of duplications does not match number of hardness values"
        hardness_values = np.repeat(hardness_values, n_duplications)
    # Set layer boundaries and hardness
    snowpack.set_param("0501", boundaries, len(boundaries))
    snowpack.set_param("0534", hardness_values, len(hardness_values))

    # make new fields, so it is clearer, where the layers actually are
    layer_middle = [
        (snowpack.layer_boundaries[i + 1] + snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]
    layer_thicknes = [
        (snowpack.layer_boundaries[i + 1] - snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]

    layer_middle = np.array(layer_middle)
    layer_thicknes = np.array(layer_thicknes)

    snowpack.data["layer middle"] = layer_middle
    snowpack.data["layer thickness"] = layer_thicknes
    if len(snowpack._height_mask) > len(layer_middle):
        snowpack._height_mask = np.delete(snowpack._height_mask, -1)

    # Process temperature data - the only property that should be interpolated
    if "temperature" in profile:
        temp_data = profile["temperature"]["elements"][0]["layers"]
        temp_values = []
        temp_pos = []
        for layer in temp_data:
            temp_values.append(layer["value"])
            temp_pos.append(layer["bottom"])
        
        # Convert to numpy arrays for efficient interpolation
        temp_pos = np.array(temp_pos, dtype=np.float64)
        temp_values = np.array(temp_values, dtype=np.float64)
        
        # Interpolate temperatures to layer midpoints
        temp_values = np.interp(layer_middle, temp_pos, temp_values)
        snowpack.set_param("0503", temp_values, len(temp_values))

    # Process density data - replicate values for split layers
    if "density" in profile:
        density_data = profile["density"]["elements"][0]["layers"]
        density_top = []
        density_bottom = []
        density_values = []
        for layer in density_data:
            if "value" in layer and layer["value"] is not None:
                density_values.append(layer["value"])
                density_top.append(layer["top"])
                density_bottom.append(layer["bottom"])
            else:
                density_values.append(0.0)  # Default value for missing data

        if density_values:
            # Convert to numpy array
            density_values = np.array(density_values, dtype=np.float64)
            density_top = np.array(density_top, dtype=np.float64)
            density_bottom = np.array(density_bottom, dtype=np.float64)
            density_boundaries = np.unique(np.concatenate([density_top, density_bottom]))
            density_boundaries.sort()
            if len(density_values) != len(layer_middle) or not all(density_boundaries == snowpack.layer_boundaries):
                print("Computing Mean densities for layers, provided density layers do not match layer boundaries")
                new_density_values = np.zeros(len(layer_middle))
                last_density_pos = 0
                for i in range(len(layer_middle)):
                    midpoint = layer_middle[i]
                    thickness = layer_thicknes[i]
                    layer_bottom = midpoint - thickness / 2
                    layer_top = midpoint + thickness / 2
                    # Weighted mean: compute overlap of density intervals with this layer
                    total_weight = 0.0
                    weighted_sum = 0.0
                    d_top = density_top[last_density_pos]
                    d_bottom = density_bottom[last_density_pos]
                    if d_top < layer_bottom:
                        last_density_pos = np.where(density_top >= layer_bottom)[0][0]
                        if not (last_density_pos > 0):
                            raise ValueError("Could not match density layers with layer boundaries")
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    if d_bottom > layer_top:
                        continue
                    
                    while d_bottom < layer_top:
                        layer_overlap = d_top - d_bottom if d_top < layer_top else layer_top - d_bottom
                        total_weight += layer_overlap
                        weighted_sum += layer_overlap * density_values[last_density_pos]
                        d_bottom = d_top
                        if d_bottom > layer_top:
                            break
                        last_density_pos += 1
                        if last_density_pos >= len(density_values):
                            warnings.warn("Not enough density layers to match with layer boundaries")
                            break
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    
                    if total_weight > 0:
                        new_density_values[i] = weighted_sum / total_weight
                    else:
                        new_density_values[i] = 0.0
                density_values = new_density_values
            elif n_duplications:
                assert len(density_values) == len(n_duplications), "Density values and duplications do not match, something went wrong when assigning layer splits"
                density_values = np.repeat(density_values, n_duplications)
            snowpack.set_param("0502", density_values, len(density_values))

    # Process grain shape data - replicate values for split layers
    if "grainshape" in profile:
        grain_data = profile["grainshape"]["elements"][0]["layers"]
        grain_codes = []
        for layer in grain_data:
            # Convert grain shape codes to Swiss Code format
            if isinstance(layer["value"], dict):
                primary = layer["value"]["primary"]
                secondary = layer["value"].get("secondary", "")
                tertiary_code = 0
                # Special case for MFcr
                if primary == "MFcr":
                    primary_code = 7
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    tertiary_code = 2
                else:
                    primary_code = TYPES_TO_CODE.get(primary, -1)
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    if primary_code == -1:
                        primary_code = -9
                        secondary_code = 9
                        tertiary_code = 9
                code = int(f"{primary_code}{secondary_code}{tertiary_code}")
            else:
                code = TYPES_TO_CODE.get(layer["value"], -999)
            
            grain_codes.append(code)
        
        if grain_codes:
            # Convert to numpy array
            grain_codes = np.array(grain_codes, dtype=np.int16)
            if n_duplications:
                assert len(grain_codes) == len(n_duplications), "Grain codes and duplications do not match, something went wrong when assigning layer splits"
                grain_codes = np.repeat(grain_codes, n_duplications)
            snowpack.set_param("0513", grain_codes, len(grain_codes))

    # Process grain size data - replicate values for split layers
    if "grainsize" in profile:
        size_data = profile["grainsize"]["elements"][0]["layers"]
        size_values = []
        for layer in size_data:
            if "value" in layer and layer["value"] is not None:
                size_values.append(layer["value"]["avg"])
            else:
                size_values.append(0.0)  # Default value for missing data
        if size_values:
            # Convert to numpy array
            size_values = np.array(size_values, dtype=np.float64)
            if n_duplications:
                assert len(size_values) == len(n_duplications), "Grain sizes and duplications do not match, something went wrong when assigning layer splits"
                size_values = np.repeat(size_values, n_duplications)
            snowpack.set_param("0512", size_values, len(size_values))

    # Process wetness data - replicate values for split layers
    if "wetness" in profile:
        wetness_data = profile["wetness"]["elements"][0]["layers"]
        wetness_values = []
        for layer in wetness_data:
            if "value" in layer and layer["value"] is not None:
                wetness_values.append(layer["value"])
            else:
                wetness_values.append(0.0)  # Default value for missing data
        if wetness_values:
            # Convert to numpy array
            wetness_values = np.array(wetness_values, dtype=np.float64)
            if n_duplications:
                assert len(wetness_values) == len(n_duplications), "Wetness values and duplications do not match, something went wrong when assigning layer splits"
                wetness_values = np.repeat(wetness_values, n_duplications)
            snowpack.set_param("0506", wetness_values, len(wetness_values))

    # stability index  

    
    snowpack._parsed = True
    
    # Calculate stability indices
    try:
        print("Calculating stability indices...")
        stability_indices = snowpack.calculate_stability_indices()
        snowpack.weak_layer = stability_indices
    except Exception as e:
        warnings.warn(f"Could not calculate stability indices: {str(e)}")
    
    return snowpack, metadata, profile_date