Module snowpat.snowpackreader

Expand source code
from .snowpackreader import readPRO, readHDF5, SnowpackReader, readCSV
from .Snowpack import Snowpack
from .json_converter import read_json_profile

__all__ = ['readPRO', 'readHDF5', 'SnowpackReader', 'readCSV', 'Snowpack', 'read_json_profile', 'Snowpack']

Sub-modules

snowpat.snowpackreader.json_converter
snowpat.snowpackreader.snowpackreader

Functions

def readCSV(filename: str)

Reads the data from a CSV file written by the toCSV method

Args

filename : str
The name of the CSV file to read the data from.

Returns

dict
The metadata.
dict
A dictionary where each date contains a dataframe with keys: data, weak_layer, surface_hoar. data is the dataframe for the respective date, and weak_layer and surface_hoar contain (if available) numpy arrays with the respective information
Expand source code
def readCSV(filename:str):
    """Reads the data from a CSV file written by the toCSV method
    
    Args:
        filename (str): The name of the CSV file to read the data from.
        
    Returns:
        dict: The metadata.
        dict: A dictionary where each date contains a dataframe with keys: data, weak_layer, surface_hoar.
                data is the dataframe for the respective date, and weak_layer and surface_hoar contain (if available) numpy arrays with the respective information
    """
    metadata = {}
    dataframes = {}
    df_start_lines = [0]
    df_end_lines = []
    counter = 0
    with open(filename, 'r') as file:
        lines = file.readlines()
        date = None
        section = ""
        for line_id, line in enumerate(lines):
            if "METADATA" in line:
                section = "metadata"
                continue
            elif "DATA" in line:
                section = "data"
                continue
            if line.startswith('#') and section == "metadata":
                key, value = line[2:].strip().split(' = ')
                metadata[key] = value
            elif line.startswith('#') and section == "data":
                if "Date" in line:
                    date = datetime.datetime.strptime(line.split('=')[1].strip(), '%d.%m.%Y %H:%M:%S')
                    dataframes[date] = {"data": []}
                    counter = line_id
                    df_end_lines.append(line_id)
                elif "weak layer" in line:
                    dataframes[date]["weak_layer"] = np.array(line.split(',')[2:], dtype=float)
                    counter = line_id
                elif "surface hoar" in line:
                    dataframes[date]["surface_hoar"] = np.array(line.split(',')[2:],dtype=float)
                    counter = line_id
            else:
                if counter not in df_start_lines:
                    df_start_lines.append(counter)
        df_start_lines.pop(0)
        df_end_lines.pop(0)
        
    for id, date in enumerate(dataframes.keys()):
        dataframes[date]["data"] = pd.read_csv(filename,  skiprows=df_start_lines[id], nrows=df_end_lines[id]-df_start_lines[id]-1)
    
    return metadata, dataframes
def readHDF5(filename: str)

Reads the data from a HDF5 file

Args

filename : str
The name of the HDF5 file to read the data from.

Returns

dict
The metadata.
dict
A dictionary where each date contains a dataframe with the profile.
Expand source code
def readHDF5(filename:str):
    """Reads the data from a HDF5 file

    Args:
        filename (str): The name of the HDF5 file to read the data from.

    Returns:
        dict: The metadata.
        dict: A dictionary where each date contains a dataframe with the profile.
    """
    metadata = {}
    data = {}

    with h5py.File(filename, 'r') as file:
        # Read metadata
        for key, value in file.attrs.items():
            metadata[key] = value
        
        # Read data
        for date in file.keys():
            group = file[date]
            weak_layer = group.attrs.get('weak_layer', None)
            surface_hoar = group.attrs.get("surface_hoar", None)
            col_names = group.attrs.get("fields")
            dat = group.get("data")
            data[date] = pd.DataFrame(dat[:], columns=col_names)
            

    return metadata, data
def readPRO(filename: str)

Reads the snowpack file and returns a SnowpackReader object.

Args

filename : str
The path to the snowpack file.

Returns

SnowpackReader
The SnowpackReader object representing the snowpack file.
Expand source code
def readPRO(filename:str):
    """
    Reads the snowpack file and returns a SnowpackReader object.

    Args:
        filename (str): The path to the snowpack file.

    Returns:
        SnowpackReader: The SnowpackReader object representing the snowpack file.
    """
    return SnowpackReader(filename)
def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) ‑> Tuple[snowpat.snowpackreader.Snowpack.Snowpack, Dict, datetime.datetime]

Read a snowpack profile from a JSON file and convert it to a Snowpack object.

Args

json_file : str
Path to the JSON file containing the snowpack profile
max_layer_size : Optional[float]
Maximum allowed layer size. Layers exceeding this will be split.

Returns

Tuple[Snowpack, Dict, datetime.datetime]
A tuple containing: - Snowpack object with the profile data - Dictionary with station metadata - Profile date
Expand source code
def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) -> Tuple[Snowpack, Dict, datetime.datetime]:
    """
    Read a snowpack profile from a JSON file and convert it to a Snowpack object.
    
    Args:
        json_file (str): Path to the JSON file containing the snowpack profile
        max_layer_size (Optional[float]): Maximum allowed layer size. Layers exceeding this will be split.
        
    Returns:
        Tuple[Snowpack, Dict, datetime.datetime]: A tuple containing:
            - Snowpack object with the profile data
            - Dictionary with station metadata
            - Profile date
    """
    with open(json_file, 'r') as f:
        data = json.load(f)
    
    # Extract station metadata
    metadata = {
        "StationName": data["name"],
        "Latitude": data["position"]["latitude"],
        "Longitude": data["position"]["longitude"],
        "Altitude": data["position"]["altitude"],
        "SlopeAngle": data["position"]["angle"],
        "SlopeAzi": data["position"]["_azimuth"]
    }
    
    # Create Snowpack object
    snowpack = Snowpack()
    
    # Get the first profile (assuming single date)
    profile = data["profiles"][0]
    
    # Get profile date
    profile_date = datetime.datetime.fromisoformat(profile["date"])
    
    # Process layer data
    layers = []
    
    # Extract layers from hardness data (which contains the layer structure)
    hardness_data = profile["hardness"]["elements"][0]["layers"]
    
    # Sort layers by bottom height to ensure correct ordering
    hardness_data.sort(key=lambda x: x["bottom"])
    
    duplicate_layers = []
        
    # Convert layer boundaries to format needed by Snowpack
    boundaries = []
    for layer in hardness_data:
        if not boundaries:  # First layer
            boundaries.append(layer["bottom"])
        boundaries.append(layer["top"])
    
    # Convert hardness values to Newton scale if needed (assuming values > 6 indicate Newton scale)
    hardness_values = []
    is_newton = any(layer["value"] > 6 for layer in hardness_data)
    snowpack.isNewton = is_newton
    for layer in hardness_data:
        hardness_values.append(layer["value"])
    
    n_duplications = []
    if max_layer_size is not None:
        new_boundaries = []
        for i in range(len(boundaries) - 1):
            layer_size = boundaries[i+1] - boundaries[i]
            if layer_size > max_layer_size:
                # Calculate splits using numpy for better numerical precision
                n_splits = math.ceil(layer_size / max_layer_size)
                split_points = np.linspace(boundaries[i], boundaries[i+1], n_splits + 1, dtype=np.float64)
                
                # Add all points except the last one (it will be added in the next iteration)
                new_boundaries.extend(split_points[:-1])
            else:
                n_splits = 1
                new_boundaries.append(boundaries[i])
            n_duplications.append(n_splits)
            
        # Add final boundary
        new_boundaries.append(boundaries[-1])
        
        # Convert to numpy arrays
        boundaries = np.array(new_boundaries, dtype=np.float64)
    else:
        boundaries = np.array(boundaries, dtype=np.float64)
    
    hardness_values = np.array(hardness_values, dtype=np.float64)
    if n_duplications:
        assert len(n_duplications) == len(hardness_values), "Number of duplications does not match number of hardness values"
        hardness_values = np.repeat(hardness_values, n_duplications)
    # Set layer boundaries and hardness
    snowpack.set_param("0501", boundaries, len(boundaries))
    snowpack.set_param("0534", hardness_values, len(hardness_values))

    # make new fields, so it is clearer, where the layers actually are
    layer_middle = [
        (snowpack.layer_boundaries[i + 1] + snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]
    layer_thicknes = [
        (snowpack.layer_boundaries[i + 1] - snowpack.layer_boundaries[i]) / 2
        for i in range(snowpack.layer_boundaries.size - 1)
    ]

    layer_middle = np.array(layer_middle)
    layer_thicknes = np.array(layer_thicknes)

    snowpack.data["layer middle"] = layer_middle
    snowpack.data["layer thickness"] = layer_thicknes
    if len(snowpack._height_mask) > len(layer_middle):
        snowpack._height_mask = np.delete(snowpack._height_mask, -1)

    # Process temperature data - the only property that should be interpolated
    if "temperature" in profile:
        temp_data = profile["temperature"]["elements"][0]["layers"]
        temp_values = []
        temp_pos = []
        for layer in temp_data:
            temp_values.append(layer["value"])
            temp_pos.append(layer["bottom"])
        
        # Convert to numpy arrays for efficient interpolation
        temp_pos = np.array(temp_pos, dtype=np.float64)
        temp_values = np.array(temp_values, dtype=np.float64)
        
        # Interpolate temperatures to layer midpoints
        temp_values = np.interp(layer_middle, temp_pos, temp_values)
        snowpack.set_param("0503", temp_values, len(temp_values))

    # Process density data - replicate values for split layers
    if "density" in profile:
        density_data = profile["density"]["elements"][0]["layers"]
        density_top = []
        density_bottom = []
        density_values = []
        for layer in density_data:
            if "value" in layer and layer["value"] is not None:
                density_values.append(layer["value"])
                density_top.append(layer["top"])
                density_bottom.append(layer["bottom"])
            else:
                density_values.append(0.0)  # Default value for missing data

        if density_values:
            # Convert to numpy array
            density_values = np.array(density_values, dtype=np.float64)
            density_top = np.array(density_top, dtype=np.float64)
            density_bottom = np.array(density_bottom, dtype=np.float64)
            density_boundaries = np.unique(np.concatenate([density_top, density_bottom]))
            density_boundaries.sort()
            if len(density_values) != len(layer_middle) or not all(density_boundaries == snowpack.layer_boundaries):
                print("Computing Mean densities for layers, provided density layers do not match layer boundaries")
                new_density_values = np.zeros(len(layer_middle))
                last_density_pos = 0
                for i in range(len(layer_middle)):
                    midpoint = layer_middle[i]
                    thickness = layer_thicknes[i]
                    layer_bottom = midpoint - thickness / 2
                    layer_top = midpoint + thickness / 2
                    # Weighted mean: compute overlap of density intervals with this layer
                    total_weight = 0.0
                    weighted_sum = 0.0
                    d_top = density_top[last_density_pos]
                    d_bottom = density_bottom[last_density_pos]
                    if d_top < layer_bottom:
                        last_density_pos = np.where(density_top >= layer_bottom)[0][0]
                        if not (last_density_pos > 0):
                            raise ValueError("Could not match density layers with layer boundaries")
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    if d_bottom > layer_top:
                        continue
                    
                    while d_bottom < layer_top:
                        layer_overlap = d_top - d_bottom if d_top < layer_top else layer_top - d_bottom
                        total_weight += layer_overlap
                        weighted_sum += layer_overlap * density_values[last_density_pos]
                        d_bottom = d_top
                        if d_bottom > layer_top:
                            break
                        last_density_pos += 1
                        if last_density_pos >= len(density_values):
                            warnings.warn("Not enough density layers to match with layer boundaries")
                            break
                        d_top = density_top[last_density_pos]
                        d_bottom = density_bottom[last_density_pos]
                    
                    if total_weight > 0:
                        new_density_values[i] = weighted_sum / total_weight
                    else:
                        new_density_values[i] = 0.0
                density_values = new_density_values
            elif n_duplications:
                assert len(density_values) == len(n_duplications), "Density values and duplications do not match, something went wrong when assigning layer splits"
                density_values = np.repeat(density_values, n_duplications)
            snowpack.set_param("0502", density_values, len(density_values))

    # Process grain shape data - replicate values for split layers
    if "grainshape" in profile:
        grain_data = profile["grainshape"]["elements"][0]["layers"]
        grain_codes = []
        for layer in grain_data:
            # Convert grain shape codes to Swiss Code format
            if isinstance(layer["value"], dict):
                primary = layer["value"]["primary"]
                secondary = layer["value"].get("secondary", "")
                tertiary_code = 0
                # Special case for MFcr
                if primary == "MFcr":
                    primary_code = 7
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    tertiary_code = 2
                else:
                    primary_code = TYPES_TO_CODE.get(primary, -1)
                    secondary_code = TYPES_TO_CODE.get(secondary, primary_code)
                    if primary_code == -1:
                        primary_code = -9
                        secondary_code = 9
                        tertiary_code = 9
                code = int(f"{primary_code}{secondary_code}{tertiary_code}")
            else:
                code = TYPES_TO_CODE.get(layer["value"], -999)
            
            grain_codes.append(code)
        
        if grain_codes:
            # Convert to numpy array
            grain_codes = np.array(grain_codes, dtype=np.int16)
            if n_duplications:
                assert len(grain_codes) == len(n_duplications), "Grain codes and duplications do not match, something went wrong when assigning layer splits"
                grain_codes = np.repeat(grain_codes, n_duplications)
            snowpack.set_param("0513", grain_codes, len(grain_codes))

    # Process grain size data - replicate values for split layers
    if "grainsize" in profile:
        size_data = profile["grainsize"]["elements"][0]["layers"]
        size_values = []
        for layer in size_data:
            if "value" in layer and layer["value"] is not None:
                size_values.append(layer["value"]["avg"])
            else:
                size_values.append(0.0)  # Default value for missing data
        if size_values:
            # Convert to numpy array
            size_values = np.array(size_values, dtype=np.float64)
            if n_duplications:
                assert len(size_values) == len(n_duplications), "Grain sizes and duplications do not match, something went wrong when assigning layer splits"
                size_values = np.repeat(size_values, n_duplications)
            snowpack.set_param("0512", size_values, len(size_values))

    # Process wetness data - replicate values for split layers
    if "wetness" in profile:
        wetness_data = profile["wetness"]["elements"][0]["layers"]
        wetness_values = []
        for layer in wetness_data:
            if "value" in layer and layer["value"] is not None:
                wetness_values.append(layer["value"])
            else:
                wetness_values.append(0.0)  # Default value for missing data
        if wetness_values:
            # Convert to numpy array
            wetness_values = np.array(wetness_values, dtype=np.float64)
            if n_duplications:
                assert len(wetness_values) == len(n_duplications), "Wetness values and duplications do not match, something went wrong when assigning layer splits"
                wetness_values = np.repeat(wetness_values, n_duplications)
            snowpack.set_param("0506", wetness_values, len(wetness_values))

    # stability index  

    
    snowpack._parsed = True
    
    # Calculate stability indices
    try:
        print("Calculating stability indices...")
        stability_indices = snowpack.calculate_stability_indices()
        snowpack.weak_layer = stability_indices
    except Exception as e:
        warnings.warn(f"Could not calculate stability indices: {str(e)}")
    
    return snowpack, metadata, profile_date

Classes

class Snowpack

Initializes a Snowpack object. Used to handle snow profile data from Snowpack.

Except for obtaining the original height in the .pro file it is advised to only use the getter methods to access the data.

Attributes

layer_boundaries : np.ndarray
The height of the layer boundaries. (Height as written in the Snowpack output)

Methods

get_param(code:str): Returns a parameter from the Snowpack object for the given data code. discard_below_ground(discard:bool): Sets whether to return data below ground level (default: True). toDf(integrate:bool): Converts the Snowpack object to a pandas DataFrame. With either integrated weak layer and surface hoar or as metadata accssible as: df.weak_layer; df.surface_hoar

Args

None

Returns

None

Expand source code
class Snowpack:
    def __init__(self):
        """
        Initializes a Snowpack object. Used to handle snow profile data from Snowpack.

        Except for obtaining the original height in the .pro file it is advised to only use the getter methods to access the data.

        Attributes:
            layer_boundaries (np.ndarray): The height of the layer boundaries. (Height as written in the Snowpack output)

        Methods:
            get_param(code:str): Returns a parameter from the Snowpack object for the given data code.
            discard_below_ground(discard:bool): Sets whether to return data below ground level (default: True).
            toDf(integrate:bool): Converts the Snowpack object to a pandas DataFrame. With either integrated weak layer and surface hoar or as metadata accssible as: df.weak_layer; df.surface_hoar

        Args:
            None

        Returns:
            None
        """
        self.layer_boundaries: Optional[np.ndarray] = None
        self.data: Dict[str, np.ndarray] = {}
        self.surface_hoar: Optional[np.ndarray] = None
        self.weak_layer: Optional[np.ndarray] = None
        self.isNewton: Optional[bool] = None
        self.old_hardness: Optional[bool] = None

        # internal variables
        self._height_mask: Optional[np.ndarray] = None
        self._above_ground = False
        self.num_nodes = None
        self._parsed = False
        
    def set_param(self, code: str, values: np.ndarray, boundaries: int):
        if not self.num_nodes:
            self.num_nodes = boundaries
        if code == "0501":
            self.layer_boundaries = values
            self._height_mask = self.layer_boundaries >= 0
        elif code == "0514":
            self.surface_hoar = values
        elif code == "0530":
            self.weak_layer = values
        else:
            self.data[code] = values
        
    def get_param(self, code: str, return_missing: bool = False):
        """
        Retrieves the parameter associated with the given code.

        Args:
            code (str): The code of the parameter to retrieve.
            return_missing (bool): Will return a missing value (-999) instead of showing a warning when a variable is not present

        Returns:
            The parameter associated with the given code.

        Raises:
            KeyError: If the code is not found in the data.

        """
        # need to add these, because they were handled seperately and can also be called through different means
        possible_codes = list(self.data.keys()) + ["0501", "0530", "0514"]
        if not code in possible_codes:
            if return_missing:
                return np.full(len(self.data["layer middle"]),-999.0) # The Snowpack Missing Value
            print(f"{code} is invalid")
            print("available codes are:")
            print(f"{self.data.keys()}")
            return
            
        if code == "0501":
            if self._above_ground:
                mask = np.append(self._height_mask,True)
                return self.layer_boundaries[mask]
            else:
                return self.layer_boundaries

        if code == "0514":
            return self.surface_hoar
        if code == "0530":
            return self.weak_layer

        param = self.data[code]
        if self._above_ground:
            param = param[self._height_mask]
        return param

    def discard_below_ground(self, discard: bool):
        """
        Sets whether to return data below ground level.

        If set to true, only data above ground will be returned, by the getter methods.
        Otherwise all the data will be returned.

        Can be used subsequently.

        Args:
            discard (bool): If True, data below ground level will be discarded. If False, all data will be kept.

        Returns:
            None
        """
        self._above_ground = discard

    def _parse_data(self, old_hardness:bool):
        # snowpack sometimes does not explicitly put a boundary at 0, so we need to append that
        if self.layer_boundaries[0] > 0:
            self.num_nodes += 1
            self.layer_boundaries = np.insert(self.layer_boundaries, 0, 0)
            self._height_mask = np.insert(self._height_mask, 0, True)
        
        # nodes give the boundaries, but values are valid for the whole layer
        n_layers = self.num_nodes -1
        for key, val in self.data.items():
            # grain types has surface hoar as 0, and is specified with a dfferent code
            if key == "0513" and len(val) == n_layers + 1: 
                self.data[key] = np.delete(val, -1)
            # fill missing layers with nans
            if self.data[key].size != n_layers:
                try:
                    self.data[key] = np.insert(
                        self.data[key], 0, [np.nan for _ in range(n_layers - self.data[key].size)]
                    )
                except:
                    print(n_layers)
                    print(self.data[key].size)
                    print(f"{key} has {self.data[key].size} values, but {n_layers} layers")


        # make new fields, so it is clearer, where the layers actually are
        layer_middle = [
            (self.layer_boundaries[i + 1] + self.layer_boundaries[i]) / 2
            for i in range(self.layer_boundaries.size - 1)
        ]
        layer_thicknes = [
            (self.layer_boundaries[i + 1] - self.layer_boundaries[i]) / 2
            for i in range(self.layer_boundaries.size - 1)
        ]

        layer_middle = np.array(layer_middle)
        layer_thicknes = np.array(layer_thicknes)

        self.data["layer middle"] = layer_middle
        self.data["layer thickness"] = layer_thicknes
        if len(self._height_mask) > n_layers:
            self._height_mask = np.delete(self._height_mask, -1)
        
        
        # check how the hardness is specified
        if "0534" in self.data.keys():
            if old_hardness:    
                self.isNewton = all(self.data["0534"] > 0)
                self.old_hardness = True
            else:
                self.isNewton = any(self.data["0534"] > 6) 
                self.old_hardness = False
        else:
            self.old_hardness = True

    def toDf(self, CodesToName: Dict = None, integrate: bool = False):
        """
        Converts the Snowpack object to a pandas DataFrame.

        In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries,
        are converted to the middle of the layers and the thickness of the layers, for a clean data frame.
        The original layer boundaries can easily be computed from that.

        The minimum stability indices (weak_layer) and surface hoar information are available as:
        df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging... the dataframe
        as pandas does not handle this yet.

        Args:
            CodesToName (Dict, optional): A dictionary mapping column data codes to column names.

        Returns:
            DataFrame: The Snowpack data as a pandas DataFrame.
        """
        df = pd.DataFrame(self.data)
        cols = (
            ["layer middle"]
            + ["layer thickness"]
            + [
                col
                for col in df.columns
                if col != "layer middle" and col != "layer thickness"
            ]
        )
        df = df[cols]
        if self._above_ground:
            df = df[self._height_mask]

        if CodesToName:
            df.rename(columns=CodesToName, inplace=True)
        if integrate:
            df.weak_layer = None
            df.surface_hoar = None
            if self.surface_hoar is not None:
                df["surface hoar"] = [
                    np.nan for _ in range(df["layer middle"].size - 1)
                ] + [self.surface_hoar]
            if self.weak_layer is not None:
                df["weak layer"] = [self.weak_layer] + [
                    np.nan for _ in range(df["layer middle"].size - 1)
                ]
        else:
            warnings.filterwarnings('ignore', 'Pandas doesn\'t allow columns to be created via a new attribute name')
            df.weak_layer = self.weak_layer
            df.surface_hoar = self.surface_hoar
        return df

    def write_pro_file(self, filepath: str, station_params: dict = None, profile_date: datetime = None):
        """
        Writes the Snowpack data to a .pro file.

        Args:
            filepath (str): Path to save the .pro file
            station_params (dict, optional): Dictionary containing station parameters.
                Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi
                If not provided, default values will be used.

        Returns:
            None
        """
        if not self._parsed:
            raise ValueError("Snowpack data has not been parsed yet.")

        # Default station parameters if not provided
        default_params = {
            "StationName": "Unknown",
            "Latitude": 0.0,
            "Longitude": 0.0,
            "Altitude": 0.0,
            "SlopeAngle": 0,
            "SlopeAzi": 0
        }
        station_params = station_params or default_params

        with open(filepath, 'w') as f:
            # Write station parameters
            f.write("[STATION_PARAMETERS]\n")
            for key, value in station_params.items():
                if key == "SlopeAzi" and value == None:
                    value = 0
                f.write(f"{key}= {value}\n")
            f.write("\n")

            # Write header section
            f.write("[HEADER]\n")
            f.write("0500,Date\n")
            f.write("0501,nElems,height [> 0: top, < 0: bottom of elem.] (cm)\n")
            f.write("0502,nElems,element density (kg m-3)\n")
            f.write("0503,nElems,element temperature (degC)\n")
            f.write("0504,nElems,element ID (1)\n")
            f.write("0506,nElems,liquid water content by volume (%)\n")
            f.write("0508,nElems,dendricity (1)\n")
            f.write("0509,nElems,sphericity (1)\n")
            f.write("0510,nElems,coordination number (1)\n")
            f.write("0511,nElems,bond size (mm)\n")
            f.write("0512,nElems,grain size (mm)\n")
            f.write("0513,nElems,grain type (Swiss Code F1F2F3)\n")
            f.write("0514,3,grain type, grain size (mm), and density (kg m-3) of SH at surface\n")
            f.write("0515,nElems,ice volume fraction (%)\n")
            f.write("0516,nElems,air volume fraction (%)\n")
            f.write("0517,nElems,stress in (kPa)\n")
            f.write("0518,nElems,viscosity (GPa s)\n")
            f.write("0519,nElems,soil volume fraction (%)\n")
            f.write("0520,nElems,temperature gradient (K m-1)\n")
            f.write("0521,nElems,thermal conductivity (W K-1 m-1)\n")
            f.write("0522,nElems,absorbed shortwave radiation (W m-2)\n")
            f.write("0523,nElems,viscous deformation rate (1.e-6 s-1)\n")
            f.write("0530,8,position (cm) and minimum stability indices:\n")
            f.write("profile type, stability class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38\n")
            f.write("0531,nElems,deformation rate stability index Sdef\n")
            f.write("0532,nElems,natural stability index Sn38\n")
            f.write("0533,nElems,stability index Sk38\n")
            f.write("0534,nElems,hand hardness either (N) or index steps (1)\n")
            f.write("0535,nElems,optical equivalent grain size (mm)\n")
            f.write("0601,nElems,snow shear strength (kPa)\n")
            f.write("0602,nElems,grain size difference (mm)\n")
            f.write("0603,nElems,hardness difference (1)\n")
            f.write("0604,nElems,ssi\n")
            f.write("0605,nElems,inverse texture index ITI (Mg m-4)\n")
            f.write("0606,nElems,critical cut length (m)\n")
            f.write("\n")

            # Write data section
            f.write("[DATA]\n")
            
            # Write date
            current_date = profile_date.strftime("%d.%m.%Y %H:%M:%S")
            f.write(f"0500,{current_date}\n")

            # Write layer boundaries
            boundaries = self.get_param("0501")
            f.write(f"0501,{len(boundaries)},{','.join(map(str, boundaries))}\n")

            # Write all other parameters
            param_codes = [code for code in self.data.keys() if code not in ["layer middle", "layer thickness"]]
            for code in sorted(param_codes):
                values = self.get_param(code)
                if values is not None:
                    if code == "0534" and not self.isNewton: # snowpack only gives 0.5 and hand hardness is negative
                        for (i,value) in enumerate(values):
                            if math.isclose(value%1, 1/3) or math.isclose(value%1, 2/3):
                                value = math.floor(value) + 0.5
                            values[i] = -1 * value
                    if code == "0513":
                        values = np.append(values, -999) # for some reason we need surface here
                        if np.any(values <100) and np.any(values >0):
                            out_vals = [f"{val}" if val > 100 or val < 0 else f"000" for val in values]
                            values = out_vals
                    f.write(f"{code},{len(values)},{','.join(map(str, values))}\n")

            # Write special codes (surface hoar and weak layer)
            if self.surface_hoar is not None:
                f.write(f"0514,3,{','.join(map(str, self.surface_hoar))}\n")
            if self.weak_layer is not None:
                f.write(f"0530,8,{','.join(map(str, self.weak_layer))}\n")

    def calculate_stability_indices(self) -> np.ndarray:
        """
        Calculate stability indices (SSI, Sk38, Sn38) for the snowpack.
        Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38]
        
        Based on:
        - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles
        - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack
        - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis
        """
        if not self._parsed:
            raise ValueError("Data must be parsed before calculating stability indices")
            
        # Get required layer properties
        heights = self.get_param("layer middle")  # cm
        thicknesses = self.get_param("layer thickness")  # cm
        densities = self.get_param("0502")  # kg/m³
        temps = self.get_param("0503")  # °C
        grain_types = self.get_param("0513")  # Code
        grain_sizes = self.get_param("0512")  # mm
        hardness = self.get_param("0534")  # N if isNewton else hand hardness index
        
        g = 9.81  # m/s²
        skier_load = 1500  # N - represents typical skier load
        
        # Initialize arrays
        n_layers = len(heights)
        # use primary grain type codes (0-9) from swiss F1F2F3 codes
        primary_gt = np.zeros(n_layers, dtype=np.int16)
        for (i,gt) in enumerate(grain_types):
            if gt == -999:
                primary_gt[i] = -999
            elif gt <100 and gt >0:
                primary_gt[i] = 0
            else:
                primary_gt[i] = int(gt // 100)
                
        stress = np.zeros(n_layers)
        strength = np.zeros(n_layers)
        Sn38 = np.zeros(n_layers)
        Sk38 = np.zeros(n_layers)
        # placeholder for SSI between layers
        ssi = np.zeros(n_layers - 1)
        
        # Calculate penetration depth Pk (m) - C++ compPenetrationDepth
        depth_m = np.sum(thicknesses) / 100.0
        rho_Pk_num = 0.0
        dz_Pk = 0.0
        cum_depth = 0.0
        for j in range(n_layers-1, -1, -1):
            if cum_depth >= 30.0: break
            Lm = thicknesses[j] / 100.0
            rho_Pk_num += densities[j] * Lm
            dz_Pk += Lm
            cum_depth += thicknesses[j]
        if dz_Pk > 0.0:
            rho_Pk = rho_Pk_num / dz_Pk
            Pk = min(0.8 * 43.3 / rho_Pk, depth_m)
        else:
            Pk = 0.0

        # Precompute reference slope angle parameters (psi_ref = 38°)
        psi_ref = 38.0
        cos_psi = math.cos(math.radians(psi_ref))
        sin_psi = math.sin(math.radians(psi_ref))
        ski_load = 85.0 * g / 1.7

        # Calculate stress, strength, Sn38 and Sk38 using reduced stresses
        for i in range(n_layers):
            # overburden stress (Pa)
            if densities[i] > 0:
                stress[i] = np.sum(densities[:i+1] * thicknesses[:i+1]) * g / 100.0
            # layer strength (shear strength Sig_c2)
            if self.isNewton:
                strength[i] = hardness[i]
            else:
                strength[i] = 2.91 * np.exp(2.91 * hardness[i])
            if temps[i] is not None:
                tf = max(0.5, 1.0 - 0.01 * abs(temps[i]))
                strength[i] *= tf
            if primary_gt[i] in [5, 6]:
                strength[i] *= 0.7
            # reduced stresses
            sig_n = -stress[i] / 1000.0
            sig_s = sig_n * sin_psi / cos_psi
            # natural stability index (getNaturalStability)
            if sig_s > 0.0:
                Sn38[i] = max(0.05, min(6.0, strength[i] / sig_s))
            else:
                Sn38[i] = 6.0
            # skier stability index (getLayerSkierStability)
            depth_lay = np.sum(thicknesses[:i+1]) / 100.0
            layer_depth = depth_lay - Pk
            if layer_depth > 1e-6:
                delta_sig = 2.0 * ski_load * math.cos(math.radians(psi_ref)) * (math.sin(math.radians(psi_ref))**2) * math.sin(math.radians(2*psi_ref))
                delta_sig /= math.pi * layer_depth * cos_psi
                delta_sig /= 1000.0
                Sk38[i] = max(0.05, min(6.0, strength[i] / (sig_s + delta_sig)))
            else:
                Sk38[i] = 6.0

        # Compute SSI and record lemon counts per interface (C++ initStructuralStabilityIndex)
        max_stab = 6.0
        nmax_lemon = 2
        lemons_arr = np.zeros(n_layers - 1, dtype=int)
        for i in range(n_layers - 1):
            lemons = 0
            if abs(hardness[i] - hardness[i+1]) > 1.5:
                lemons += 1
            if grain_sizes is not None and 2 * abs(grain_sizes[i] - grain_sizes[i+1]) > 0.5:
                lemons += 1
            lemons_arr[i] = lemons
            val = nmax_lemon - lemons + Sk38[i+1]
            ssi[i] = max(0.05, min(max_stab, val))
        
        # Determine weak-layer location and values
        min_Sn38_idx = np.argmin(Sn38)
        min_Sk38_idx = np.argmin(Sk38)
        min_ssi_idx = np.argmin(ssi)
        Swl_ssi = ssi[min_ssi_idx]
        Swl_lemon = int(lemons_arr[min_ssi_idx])
        Swl_Sk38 = Sk38[min_ssi_idx+1]
        # Profile type: apply surface hoar flag, fallback on error
        if not (Swl_ssi > 0 and Swl_ssi < 100):
            profile_type = -1
        else:
            profile_type = 2 if np.any(primary_gt == 6) else 1
        # Stability class via SchweizerBellaire2
        if Swl_ssi > 0 and Swl_ssi < 100:
            if Swl_lemon >= 2:
                stability_class = 1
            elif Swl_lemon == 1:
                if Swl_Sk38 < 0.48:
                    stability_class = 1
                elif Swl_Sk38 < 0.71:
                    stability_class = 3
                else:
                    stability_class = 5
            else:
                stability_class = 3
        else:
            stability_class = -1
        # Format output according to 0530 code spec
        result = np.array([
            profile_type,
            stability_class,
            heights[min_ssi_idx],   # z_Sdef
            ssi[min_ssi_idx],       # Sdef (SSI)
            heights[min_Sn38_idx],  # z_Sn38
            Sn38[min_Sn38_idx],     # Sn38
            heights[min_Sk38_idx],  # z_Sk38
            Sk38[min_Sk38_idx]      # Sk38
        ])
        
        # Store result in weak_layer property
        self.weak_layer = result
        self.set_param("0530", result, len(result))
        self.set_param("0604", ssi, len(ssi))
        
        return result

Methods

def calculate_stability_indices(self) ‑> numpy.ndarray

Calculate stability indices (SSI, Sk38, Sn38) for the snowpack. Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38]

Based on: - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis

Expand source code
def calculate_stability_indices(self) -> np.ndarray:
    """
    Calculate stability indices (SSI, Sk38, Sn38) for the snowpack.
    Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38]
    
    Based on:
    - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles
    - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack
    - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis
    """
    if not self._parsed:
        raise ValueError("Data must be parsed before calculating stability indices")
        
    # Get required layer properties
    heights = self.get_param("layer middle")  # cm
    thicknesses = self.get_param("layer thickness")  # cm
    densities = self.get_param("0502")  # kg/m³
    temps = self.get_param("0503")  # °C
    grain_types = self.get_param("0513")  # Code
    grain_sizes = self.get_param("0512")  # mm
    hardness = self.get_param("0534")  # N if isNewton else hand hardness index
    
    g = 9.81  # m/s²
    skier_load = 1500  # N - represents typical skier load
    
    # Initialize arrays
    n_layers = len(heights)
    # use primary grain type codes (0-9) from swiss F1F2F3 codes
    primary_gt = np.zeros(n_layers, dtype=np.int16)
    for (i,gt) in enumerate(grain_types):
        if gt == -999:
            primary_gt[i] = -999
        elif gt <100 and gt >0:
            primary_gt[i] = 0
        else:
            primary_gt[i] = int(gt // 100)
            
    stress = np.zeros(n_layers)
    strength = np.zeros(n_layers)
    Sn38 = np.zeros(n_layers)
    Sk38 = np.zeros(n_layers)
    # placeholder for SSI between layers
    ssi = np.zeros(n_layers - 1)
    
    # Calculate penetration depth Pk (m) - C++ compPenetrationDepth
    depth_m = np.sum(thicknesses) / 100.0
    rho_Pk_num = 0.0
    dz_Pk = 0.0
    cum_depth = 0.0
    for j in range(n_layers-1, -1, -1):
        if cum_depth >= 30.0: break
        Lm = thicknesses[j] / 100.0
        rho_Pk_num += densities[j] * Lm
        dz_Pk += Lm
        cum_depth += thicknesses[j]
    if dz_Pk > 0.0:
        rho_Pk = rho_Pk_num / dz_Pk
        Pk = min(0.8 * 43.3 / rho_Pk, depth_m)
    else:
        Pk = 0.0

    # Precompute reference slope angle parameters (psi_ref = 38°)
    psi_ref = 38.0
    cos_psi = math.cos(math.radians(psi_ref))
    sin_psi = math.sin(math.radians(psi_ref))
    ski_load = 85.0 * g / 1.7

    # Calculate stress, strength, Sn38 and Sk38 using reduced stresses
    for i in range(n_layers):
        # overburden stress (Pa)
        if densities[i] > 0:
            stress[i] = np.sum(densities[:i+1] * thicknesses[:i+1]) * g / 100.0
        # layer strength (shear strength Sig_c2)
        if self.isNewton:
            strength[i] = hardness[i]
        else:
            strength[i] = 2.91 * np.exp(2.91 * hardness[i])
        if temps[i] is not None:
            tf = max(0.5, 1.0 - 0.01 * abs(temps[i]))
            strength[i] *= tf
        if primary_gt[i] in [5, 6]:
            strength[i] *= 0.7
        # reduced stresses
        sig_n = -stress[i] / 1000.0
        sig_s = sig_n * sin_psi / cos_psi
        # natural stability index (getNaturalStability)
        if sig_s > 0.0:
            Sn38[i] = max(0.05, min(6.0, strength[i] / sig_s))
        else:
            Sn38[i] = 6.0
        # skier stability index (getLayerSkierStability)
        depth_lay = np.sum(thicknesses[:i+1]) / 100.0
        layer_depth = depth_lay - Pk
        if layer_depth > 1e-6:
            delta_sig = 2.0 * ski_load * math.cos(math.radians(psi_ref)) * (math.sin(math.radians(psi_ref))**2) * math.sin(math.radians(2*psi_ref))
            delta_sig /= math.pi * layer_depth * cos_psi
            delta_sig /= 1000.0
            Sk38[i] = max(0.05, min(6.0, strength[i] / (sig_s + delta_sig)))
        else:
            Sk38[i] = 6.0

    # Compute SSI and record lemon counts per interface (C++ initStructuralStabilityIndex)
    max_stab = 6.0
    nmax_lemon = 2
    lemons_arr = np.zeros(n_layers - 1, dtype=int)
    for i in range(n_layers - 1):
        lemons = 0
        if abs(hardness[i] - hardness[i+1]) > 1.5:
            lemons += 1
        if grain_sizes is not None and 2 * abs(grain_sizes[i] - grain_sizes[i+1]) > 0.5:
            lemons += 1
        lemons_arr[i] = lemons
        val = nmax_lemon - lemons + Sk38[i+1]
        ssi[i] = max(0.05, min(max_stab, val))
    
    # Determine weak-layer location and values
    min_Sn38_idx = np.argmin(Sn38)
    min_Sk38_idx = np.argmin(Sk38)
    min_ssi_idx = np.argmin(ssi)
    Swl_ssi = ssi[min_ssi_idx]
    Swl_lemon = int(lemons_arr[min_ssi_idx])
    Swl_Sk38 = Sk38[min_ssi_idx+1]
    # Profile type: apply surface hoar flag, fallback on error
    if not (Swl_ssi > 0 and Swl_ssi < 100):
        profile_type = -1
    else:
        profile_type = 2 if np.any(primary_gt == 6) else 1
    # Stability class via SchweizerBellaire2
    if Swl_ssi > 0 and Swl_ssi < 100:
        if Swl_lemon >= 2:
            stability_class = 1
        elif Swl_lemon == 1:
            if Swl_Sk38 < 0.48:
                stability_class = 1
            elif Swl_Sk38 < 0.71:
                stability_class = 3
            else:
                stability_class = 5
        else:
            stability_class = 3
    else:
        stability_class = -1
    # Format output according to 0530 code spec
    result = np.array([
        profile_type,
        stability_class,
        heights[min_ssi_idx],   # z_Sdef
        ssi[min_ssi_idx],       # Sdef (SSI)
        heights[min_Sn38_idx],  # z_Sn38
        Sn38[min_Sn38_idx],     # Sn38
        heights[min_Sk38_idx],  # z_Sk38
        Sk38[min_Sk38_idx]      # Sk38
    ])
    
    # Store result in weak_layer property
    self.weak_layer = result
    self.set_param("0530", result, len(result))
    self.set_param("0604", ssi, len(ssi))
    
    return result
def discard_below_ground(self, discard: bool)

Sets whether to return data below ground level.

If set to true, only data above ground will be returned, by the getter methods. Otherwise all the data will be returned.

Can be used subsequently.

Args

discard : bool
If True, data below ground level will be discarded. If False, all data will be kept.

Returns

None

Expand source code
def discard_below_ground(self, discard: bool):
    """
    Sets whether to return data below ground level.

    If set to true, only data above ground will be returned, by the getter methods.
    Otherwise all the data will be returned.

    Can be used subsequently.

    Args:
        discard (bool): If True, data below ground level will be discarded. If False, all data will be kept.

    Returns:
        None
    """
    self._above_ground = discard
def get_param(self, code: str, return_missing: bool = False)

Retrieves the parameter associated with the given code.

Args

code : str
The code of the parameter to retrieve.
return_missing : bool
Will return a missing value (-999) instead of showing a warning when a variable is not present

Returns

The parameter associated with the given code.

Raises

KeyError
If the code is not found in the data.
Expand source code
def get_param(self, code: str, return_missing: bool = False):
    """
    Retrieves the parameter associated with the given code.

    Args:
        code (str): The code of the parameter to retrieve.
        return_missing (bool): Will return a missing value (-999) instead of showing a warning when a variable is not present

    Returns:
        The parameter associated with the given code.

    Raises:
        KeyError: If the code is not found in the data.

    """
    # need to add these, because they were handled seperately and can also be called through different means
    possible_codes = list(self.data.keys()) + ["0501", "0530", "0514"]
    if not code in possible_codes:
        if return_missing:
            return np.full(len(self.data["layer middle"]),-999.0) # The Snowpack Missing Value
        print(f"{code} is invalid")
        print("available codes are:")
        print(f"{self.data.keys()}")
        return
        
    if code == "0501":
        if self._above_ground:
            mask = np.append(self._height_mask,True)
            return self.layer_boundaries[mask]
        else:
            return self.layer_boundaries

    if code == "0514":
        return self.surface_hoar
    if code == "0530":
        return self.weak_layer

    param = self.data[code]
    if self._above_ground:
        param = param[self._height_mask]
    return param
def set_param(self, code: str, values: numpy.ndarray, boundaries: int)
Expand source code
def set_param(self, code: str, values: np.ndarray, boundaries: int):
    if not self.num_nodes:
        self.num_nodes = boundaries
    if code == "0501":
        self.layer_boundaries = values
        self._height_mask = self.layer_boundaries >= 0
    elif code == "0514":
        self.surface_hoar = values
    elif code == "0530":
        self.weak_layer = values
    else:
        self.data[code] = values
def toDf(self, CodesToName: Dict = None, integrate: bool = False)

Converts the Snowpack object to a pandas DataFrame.

In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries, are converted to the middle of the layers and the thickness of the layers, for a clean data frame. The original layer boundaries can easily be computed from that.

The minimum stability indices (weak_layer) and surface hoar information are available as: df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging… the dataframe as pandas does not handle this yet.

Args

CodesToName : Dict, optional
A dictionary mapping column data codes to column names.

Returns

DataFrame
The Snowpack data as a pandas DataFrame.
Expand source code
def toDf(self, CodesToName: Dict = None, integrate: bool = False):
    """
    Converts the Snowpack object to a pandas DataFrame.

    In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries,
    are converted to the middle of the layers and the thickness of the layers, for a clean data frame.
    The original layer boundaries can easily be computed from that.

    The minimum stability indices (weak_layer) and surface hoar information are available as:
    df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging... the dataframe
    as pandas does not handle this yet.

    Args:
        CodesToName (Dict, optional): A dictionary mapping column data codes to column names.

    Returns:
        DataFrame: The Snowpack data as a pandas DataFrame.
    """
    df = pd.DataFrame(self.data)
    cols = (
        ["layer middle"]
        + ["layer thickness"]
        + [
            col
            for col in df.columns
            if col != "layer middle" and col != "layer thickness"
        ]
    )
    df = df[cols]
    if self._above_ground:
        df = df[self._height_mask]

    if CodesToName:
        df.rename(columns=CodesToName, inplace=True)
    if integrate:
        df.weak_layer = None
        df.surface_hoar = None
        if self.surface_hoar is not None:
            df["surface hoar"] = [
                np.nan for _ in range(df["layer middle"].size - 1)
            ] + [self.surface_hoar]
        if self.weak_layer is not None:
            df["weak layer"] = [self.weak_layer] + [
                np.nan for _ in range(df["layer middle"].size - 1)
            ]
    else:
        warnings.filterwarnings('ignore', 'Pandas doesn\'t allow columns to be created via a new attribute name')
        df.weak_layer = self.weak_layer
        df.surface_hoar = self.surface_hoar
    return df
def write_pro_file(self, filepath: str, station_params: dict = None, profile_date:  = None)

Writes the Snowpack data to a .pro file.

Args

filepath : str
Path to save the .pro file
station_params : dict, optional
Dictionary containing station parameters. Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi If not provided, default values will be used.

Returns

None

Expand source code
def write_pro_file(self, filepath: str, station_params: dict = None, profile_date: datetime = None):
    """
    Writes the Snowpack data to a .pro file.

    Args:
        filepath (str): Path to save the .pro file
        station_params (dict, optional): Dictionary containing station parameters.
            Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi
            If not provided, default values will be used.

    Returns:
        None
    """
    if not self._parsed:
        raise ValueError("Snowpack data has not been parsed yet.")

    # Default station parameters if not provided
    default_params = {
        "StationName": "Unknown",
        "Latitude": 0.0,
        "Longitude": 0.0,
        "Altitude": 0.0,
        "SlopeAngle": 0,
        "SlopeAzi": 0
    }
    station_params = station_params or default_params

    with open(filepath, 'w') as f:
        # Write station parameters
        f.write("[STATION_PARAMETERS]\n")
        for key, value in station_params.items():
            if key == "SlopeAzi" and value == None:
                value = 0
            f.write(f"{key}= {value}\n")
        f.write("\n")

        # Write header section
        f.write("[HEADER]\n")
        f.write("0500,Date\n")
        f.write("0501,nElems,height [> 0: top, < 0: bottom of elem.] (cm)\n")
        f.write("0502,nElems,element density (kg m-3)\n")
        f.write("0503,nElems,element temperature (degC)\n")
        f.write("0504,nElems,element ID (1)\n")
        f.write("0506,nElems,liquid water content by volume (%)\n")
        f.write("0508,nElems,dendricity (1)\n")
        f.write("0509,nElems,sphericity (1)\n")
        f.write("0510,nElems,coordination number (1)\n")
        f.write("0511,nElems,bond size (mm)\n")
        f.write("0512,nElems,grain size (mm)\n")
        f.write("0513,nElems,grain type (Swiss Code F1F2F3)\n")
        f.write("0514,3,grain type, grain size (mm), and density (kg m-3) of SH at surface\n")
        f.write("0515,nElems,ice volume fraction (%)\n")
        f.write("0516,nElems,air volume fraction (%)\n")
        f.write("0517,nElems,stress in (kPa)\n")
        f.write("0518,nElems,viscosity (GPa s)\n")
        f.write("0519,nElems,soil volume fraction (%)\n")
        f.write("0520,nElems,temperature gradient (K m-1)\n")
        f.write("0521,nElems,thermal conductivity (W K-1 m-1)\n")
        f.write("0522,nElems,absorbed shortwave radiation (W m-2)\n")
        f.write("0523,nElems,viscous deformation rate (1.e-6 s-1)\n")
        f.write("0530,8,position (cm) and minimum stability indices:\n")
        f.write("profile type, stability class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38\n")
        f.write("0531,nElems,deformation rate stability index Sdef\n")
        f.write("0532,nElems,natural stability index Sn38\n")
        f.write("0533,nElems,stability index Sk38\n")
        f.write("0534,nElems,hand hardness either (N) or index steps (1)\n")
        f.write("0535,nElems,optical equivalent grain size (mm)\n")
        f.write("0601,nElems,snow shear strength (kPa)\n")
        f.write("0602,nElems,grain size difference (mm)\n")
        f.write("0603,nElems,hardness difference (1)\n")
        f.write("0604,nElems,ssi\n")
        f.write("0605,nElems,inverse texture index ITI (Mg m-4)\n")
        f.write("0606,nElems,critical cut length (m)\n")
        f.write("\n")

        # Write data section
        f.write("[DATA]\n")
        
        # Write date
        current_date = profile_date.strftime("%d.%m.%Y %H:%M:%S")
        f.write(f"0500,{current_date}\n")

        # Write layer boundaries
        boundaries = self.get_param("0501")
        f.write(f"0501,{len(boundaries)},{','.join(map(str, boundaries))}\n")

        # Write all other parameters
        param_codes = [code for code in self.data.keys() if code not in ["layer middle", "layer thickness"]]
        for code in sorted(param_codes):
            values = self.get_param(code)
            if values is not None:
                if code == "0534" and not self.isNewton: # snowpack only gives 0.5 and hand hardness is negative
                    for (i,value) in enumerate(values):
                        if math.isclose(value%1, 1/3) or math.isclose(value%1, 2/3):
                            value = math.floor(value) + 0.5
                        values[i] = -1 * value
                if code == "0513":
                    values = np.append(values, -999) # for some reason we need surface here
                    if np.any(values <100) and np.any(values >0):
                        out_vals = [f"{val}" if val > 100 or val < 0 else f"000" for val in values]
                        values = out_vals
                f.write(f"{code},{len(values)},{','.join(map(str, values))}\n")

        # Write special codes (surface hoar and weak layer)
        if self.surface_hoar is not None:
            f.write(f"0514,3,{','.join(map(str, self.surface_hoar))}\n")
        if self.weak_layer is not None:
            f.write(f"0530,8,{','.join(map(str, self.weak_layer))}\n")
class SnowpackReader (filename: str)

A class for reading and parsing Snowpack files.

Initializes a SnowpackReader object.

The data and units are saved with the data codes as keys.

So to get a data column for a parameter use data[date][code] to get the data at a specific date and units[code] to get the unit.

Conversion from Codes to names and vice versa is available through the DataCodes and NamesToCodes dictionaries.

Args

filename : str
The path to the Snowpack file.
meaningful_names : bool, optional
If True, the data codes will be replaced with meaningful names given in the header, in the returned dataframe

Attributes

filename : str
The path to the Snowpack file.
metadata : dict, optional
A dictionary containing metadata information. Initialized as None.
DataCodes : dict
A dictionary containing default data codes. Initialized with get_default_codes().
data : dict
A dictionary to store the data read from the Snowpack file, each date is a key, and profiles are safed as Snowpack objects
units : dict
A dictionary to store the units of the data.

It is advised to use the Classes getter methods to get the data:

Methods

get_profile_on(date: datetime.datetime) -> Snowpack: Returns the data for the given date. get_all_profiles() -> List[Snowpack]: Returns all the profiles. get_var(code:str) -> List[np.ndarray]: Returns the data for the given code. CodeToName(code:str) -> str: Returns the name of the code.

If you do not want to have data below ground level, use the discard_below_ground method.

Methods

discard_below_ground(discard: bool): Getter and writers only use the above ground data.

To write to another file format use the toCSV and toHDF5 methods. It is advised to use the HDF5 format as it is much more robust Methods: toCSV(filename:str, integrate:bool=False): Writes the data to a CSV file. toHDF5(filename:str, integrate:bool=False): Writes the data to a HDF5 file.

Reader Methods to read the data from a file written by this class: readHDF5(filename:str) -> dict, dict: Reads the data from a HDF5 file. fromCSV(filename:str) -> dict, dict: Reads the data from a CSV file written by the toCSV method.

Expand source code
class SnowpackReader:
    """A class for reading and parsing Snowpack files."""
    
    def __init__(self, filename: str):
        """
        Initializes a SnowpackReader object.
        
        The data and units are saved with the data codes as keys.
        
        So to get a data column for a parameter use data[date][code] to get the data at a specific date and units[code] to get the unit.
        
        Conversion from Codes to names and vice versa is available through the DataCodes and NamesToCodes dictionaries.

        Args:
            filename (str): The path to the Snowpack file.
            meaningful_names (bool, optional): If True, the data codes will be replaced with meaningful names given in the header, in the returned dataframe

        Attributes:
            filename (str): The path to the Snowpack file.
            metadata (dict, optional): A dictionary containing metadata information. Initialized as None.
            DataCodes (dict): A dictionary containing default data codes. Initialized with get_default_codes().
            data (dict): A dictionary to store the data read from the Snowpack file, each date is a key, and profiles are safed as Snowpack objects
            units (dict): A dictionary to store the units of the data.

        It is advised to use the Classes getter methods to get the data:
        Methods:
            get_profile_on(date: datetime.datetime) -> Snowpack: Returns the data for the given date.
            get_all_profiles() -> List[Snowpack]: Returns all the profiles.
            get_var(code:str) -> List[np.ndarray]: Returns the data for the given code.
            CodeToName(code:str) -> str: Returns the name of the code.
        
        If you do not want to have data below ground level, use the discard_below_ground method.
        Methods:
            discard_below_ground(discard: bool): Getter and writers only use the above ground data.
        
        To write to another file format use the toCSV and toHDF5 methods. It is advised to use the HDF5 format as it is much more robust
        Methods: 
            toCSV(filename:str, integrate:bool=False): Writes the data to a CSV file.
            toHDF5(filename:str, integrate:bool=False): Writes the data to a HDF5 file.
            
        Reader Methods to read the data from a file written by this class:
            readHDF5(filename:str) -> dict, dict: Reads the data from a HDF5 file.
            fromCSV(filename:str) -> dict, dict: Reads the data from a CSV file written by the toCSV method.

        """
        self.filename = filename
        
        # file content
        self.metadata = {}
        self.DataCodes = {}
        self.units = {}
        self.data:Dict[datetime.datetime,Snowpack] = {}
        
        # internal help variables
        self.current_date = None
        self._old_hardness = False
        
        self._read_file()
        self._clear_mapping()
        self._parse_profiles()
        

    def _read_file(self):
        """Reads and parses the Snowpack file"""
        try:
            with open(self.filename, 'r') as file:
                self._parse_file(file)
        except (FileNotFoundError, PermissionError) as e:
            raise Exception(f"Error opening file: {e}")
            # Handle error appropriately

    def _parse_file(self, file):
        """
        Parses the Snowpack file line by line.

        Args:
            file (file object): The file object to parse.
        """
        current_section = None
        for line in file:
            line = line.strip()
            if not line or line.startswith('#'):  # Skip empty or comment lines
                continue
            current_section, new_section  = self._determine_section(line, current_section)
            if new_section:
                continue
            self._parse_sections(line, current_section)

    def _determine_section(self, line: str, current_section: str) -> str:
        """
        Determines the current section based on the line content.

        Args:
            line (str): The line to analyze.
            current_section (str): The current section.

        Returns:
            str: The updated current section.
        """
        if "STATION_PARAMETERS" in line:
            return 'metadata', True
        elif "HEADER" in line:
            return 'header', True
        elif "DATA" in line:
            return 'data', True
        return current_section, False
    
    def _parse_header(self, line:str, current_section:str):
        """
        Parses the header sections [STATION_PARAMETERS] and [HEADER].

        Parameters:
        - line (str): The line to parse.
        - current_section (str): The current section.
        """
        if current_section == "metadata":
            key, value = line.split('=')
            self.metadata[key.strip()] = value.strip()            
        elif current_section == 'header':
            line_vals = line.split(',')
            if line_vals[0] == '0500' and line_vals[1] == 'Date':
                self.DataCodes["0500"] = "Date"
            elif line_vals[0] == '0530':
                self.DataCodes["0530"] = "minimum stability indices (cm)"
            elif line_vals[0] == '0514':
                self.DataCodes["0514"] = "surface hoar ([Swiss Code, mm, kgm-3])"
            elif line_vals[0] == '0534':
                self._old_hardness = line_vals[2] == OLD_HEADER
            else:
                if len(line_vals) >= 3:
                    self.DataCodes[line_vals[0]] = ",".join(line_vals[2:])
                else:
                    self.DataCodes[line_vals[0]] = line_vals[2]

    def _parse_data_section(self, line:str):
        """Parses the [DATA] section"""
        
        vals = [val.strip() for val in line.split(',')]
        code = vals[0]
        if len(code)!=4: raise ValueError("Invalid data code: {}".format(code));
        if vals[1] == "nElems" or vals[1] == "grain":
            print("Incorrect header section, continuing anyways.")
            self._parse_header(line, "header")
        

        # parse the data lines    
        if code == '0500':
            self.current_date = datetime.datetime.strptime(vals[1], '%d.%m.%Y %H:%M:%S')
            self.data[self.current_date] = Snowpack()
            return None
        
        n_elements = int(vals[1])
        elements = vals[2:]
       
        elements_arr = np.array(elements).astype(float)        
        self.data[self.current_date].set_param(code, elements_arr, n_elements)
        
    def _parse_sections(self, line:str, current_section:str):
        if current_section == "metadata" or current_section == "header":
            self._parse_header(line, current_section)
        elif current_section == "data":
            self._parse_data_section(line)
        else:
            raise ValueError("Invalid section: {}".format(current_section))
        
        
    def _clear_mapping(self):
        """Clears the mapping of data codes"""
        for k, v in self.DataCodes.items():
            if "(" in v and ")" in v:
                unit = v.split("(")[1].split(")")[0]
            else:
                unit = "b.E."
            self.units[k] = unit  
            self.DataCodes[k] = v.split("(")[0].strip()
        self.DataCodes = {k : v.split("[")[0].strip() for k, v in self.DataCodes.items()}
        self.NamesToCodes = {v: k for k, v in self.DataCodes.items()}
        
    def _parse_profiles(self):
        """Parses the profiles"""
        for date in self.data:
            self.data[date]._parse_data(self._old_hardness)

    def update_name_of_code(self, code: str, name: str):
        """Updates the name of the code"""
        if code not in self.DataCodes:
            raise ValueError(f"{code} not found. Available codes are: {', '.join(self.DataCodes.keys())}")
        self.DataCodes[code] = name
        self.NamesToCodes[name] = code
    
    def name_to_code(self, name: str) -> str:
        """Returns the code for the given name"""
        try:
            return self.NamesToCodes[name]
        except KeyError:
            raise KeyError(f"{name} not found. Available names are: {', '.join(self.NamesToCodes.keys())}")
                
    def discard_below_ground(self, discard: bool = True):
        """Will only return the above ground data.

        Args:
            discard (bool): If True, below ground data will be discarded. If False, below ground data will be included.

        """
        for date in self.data:
            self.data[date].discard_below_ground(discard)
    
    def get_profile_nr(self, nr: int) -> Snowpack:
        """Returns the profile for the given number"""
        return self.data[self.get_all_dates()[nr]]
                
    def get_profile_on(self, date: datetime.datetime) -> Snowpack:
        """
        Returns the data for the given date.

        Args:
            date (datetime.datetime): The date for which to retrieve the data.

        Returns:
            pandas.DataFrame: The snowpack profile for the given date.
        """
        if date in self.get_all_dates():
            return self.data[date]
        else:
            return None

    def get_all_dates(self) -> List[datetime.datetime]:
            """Returns all the dates in the data.

            Returns:
                list: A list of all the dates in the data.
            """
            return list(self.data.keys())
    
    def get_all_profiles(self)-> List[Snowpack]:
        """
        Returns all the profiles.

        Returns:
            list: returns a list of profiles for each date.
        """
        return [self.get_profile_on(date) for date in self.get_all_dates()]
    
    
    def get_var(self, code:str, return_missing:bool = False ):
        """Returns the data for the given code"""
        return [self.get_profile_on(date).get_param(code, return_missing) for date in self.get_all_dates()]
    
    
    def toCSV(self, filename:str, integrate:bool=False):
            """Writes the data to a CSV file
            
            Args:
                filename (str): The name of the CSV file to write the data to.
                integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information.

            Special information is 0514: surface hoar and 0530: minimum stabiliy index
            """
            # first write the metadata
            with open(filename, 'w') as file:
                file.write("# This file was created with the SnowpackReader library, which also provides a method to read this file.\n")
                file.write("# [METADATA]\n")
                for key, value in self.metadata.items():
                    file.write("# {} = {}\n".format(key, value))
                
                file.write("# [DATA]\n")
                for date in self.get_all_dates():
                    file.write("# Date = {}\n".format(date.strftime('%d.%m.%Y %H:%M:%S')))
                    profile = self.get_profile_on(date)
                    if profile.weak_layer is not None:
                        file.write(f"# 0530, weak layer, {', '.join(map(str, profile.weak_layer))}\n")         
                    if profile.surface_hoar is not None:
                        file.write(f"# 0514, surface hoar, {', '.join(map(str, profile.weak_layer))}\n")         
                    profile.toDf(integrate=integrate).to_csv(file, index=False, mode='a')
    
    def toHDF5(self, filename:str, integrate:bool = False):
            """Writes the data to a HDF5 file
            
            Args:
                filename (str): The name of the HDF5 file to write the data to.
                integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information.
            
            Special information is 0514: surface hoar and 0530: minimum stabiliy index
            """
            with h5py.File(filename, 'w') as file:
                for key, value in self.metadata.items():
                    file.attrs[key] = value
                
                for date in self.get_all_dates():
                    group = file.create_group(date.strftime('%d.%m.%Y %H:%M:%S'))
                    profile = self.get_profile_on(date)
                    if profile.weak_layer is not None:
                        group.attrs['weak_layer'] = profile.weak_layer
                    if profile.surface_hoar is not None:
                        group.attrs['surface'] = profile.surface_hoar
                    df = profile.toDf(integrate=integrate)
                    col_names = list(df.columns)
                    arr = df.to_numpy()
                    group.attrs["fields"] = col_names
                    group.create_dataset("data", data=arr)

    def __str__(self):
        ss = "PRO File:\n"
        ss += "Filename: {}\n".format(self.filename)
        ss += "Metadata: {}\n".format(self.metadata)
        ss += "Data codes: {}\n".format(self.DataCodes)
        ss += "Units: {}\n".format(self.units)
        ss += "Dates: {}\n".format(self.get_all_dates())
        return ss
    
    def info(self):
        print(self)

Methods

def discard_below_ground(self, discard: bool = True)

Will only return the above ground data.

Args

discard : bool
If True, below ground data will be discarded. If False, below ground data will be included.
Expand source code
def discard_below_ground(self, discard: bool = True):
    """Will only return the above ground data.

    Args:
        discard (bool): If True, below ground data will be discarded. If False, below ground data will be included.

    """
    for date in self.data:
        self.data[date].discard_below_ground(discard)
def get_all_dates(self) ‑> List[datetime.datetime]

Returns all the dates in the data.

Returns

list
A list of all the dates in the data.
Expand source code
def get_all_dates(self) -> List[datetime.datetime]:
        """Returns all the dates in the data.

        Returns:
            list: A list of all the dates in the data.
        """
        return list(self.data.keys())
def get_all_profiles(self) ‑> List[snowpat.snowpackreader.Snowpack.Snowpack]

Returns all the profiles.

Returns

list
returns a list of profiles for each date.
Expand source code
def get_all_profiles(self)-> List[Snowpack]:
    """
    Returns all the profiles.

    Returns:
        list: returns a list of profiles for each date.
    """
    return [self.get_profile_on(date) for date in self.get_all_dates()]
def get_profile_nr(self, nr: int) ‑> snowpat.snowpackreader.Snowpack.Snowpack

Returns the profile for the given number

Expand source code
def get_profile_nr(self, nr: int) -> Snowpack:
    """Returns the profile for the given number"""
    return self.data[self.get_all_dates()[nr]]
def get_profile_on(self, date: datetime.datetime) ‑> snowpat.snowpackreader.Snowpack.Snowpack

Returns the data for the given date.

Args

date : datetime.datetime
The date for which to retrieve the data.

Returns

pandas.DataFrame
The snowpack profile for the given date.
Expand source code
def get_profile_on(self, date: datetime.datetime) -> Snowpack:
    """
    Returns the data for the given date.

    Args:
        date (datetime.datetime): The date for which to retrieve the data.

    Returns:
        pandas.DataFrame: The snowpack profile for the given date.
    """
    if date in self.get_all_dates():
        return self.data[date]
    else:
        return None
def get_var(self, code: str, return_missing: bool = False)

Returns the data for the given code

Expand source code
def get_var(self, code:str, return_missing:bool = False ):
    """Returns the data for the given code"""
    return [self.get_profile_on(date).get_param(code, return_missing) for date in self.get_all_dates()]
def info(self)
Expand source code
def info(self):
    print(self)
def name_to_code(self, name: str) ‑> str

Returns the code for the given name

Expand source code
def name_to_code(self, name: str) -> str:
    """Returns the code for the given name"""
    try:
        return self.NamesToCodes[name]
    except KeyError:
        raise KeyError(f"{name} not found. Available names are: {', '.join(self.NamesToCodes.keys())}")
def toCSV(self, filename: str, integrate: bool = False)

Writes the data to a CSV file

Args

filename : str
The name of the CSV file to write the data to.
integrate : bool, optional
Whether to integrate the special information into the dataframe or keep it as additional information.

Special information is 0514: surface hoar and 0530: minimum stabiliy index

Expand source code
def toCSV(self, filename:str, integrate:bool=False):
        """Writes the data to a CSV file
        
        Args:
            filename (str): The name of the CSV file to write the data to.
            integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information.

        Special information is 0514: surface hoar and 0530: minimum stabiliy index
        """
        # first write the metadata
        with open(filename, 'w') as file:
            file.write("# This file was created with the SnowpackReader library, which also provides a method to read this file.\n")
            file.write("# [METADATA]\n")
            for key, value in self.metadata.items():
                file.write("# {} = {}\n".format(key, value))
            
            file.write("# [DATA]\n")
            for date in self.get_all_dates():
                file.write("# Date = {}\n".format(date.strftime('%d.%m.%Y %H:%M:%S')))
                profile = self.get_profile_on(date)
                if profile.weak_layer is not None:
                    file.write(f"# 0530, weak layer, {', '.join(map(str, profile.weak_layer))}\n")         
                if profile.surface_hoar is not None:
                    file.write(f"# 0514, surface hoar, {', '.join(map(str, profile.weak_layer))}\n")         
                profile.toDf(integrate=integrate).to_csv(file, index=False, mode='a')
def toHDF5(self, filename: str, integrate: bool = False)

Writes the data to a HDF5 file

Args

filename : str
The name of the HDF5 file to write the data to.
integrate : bool, optional
Whether to integrate the special information into the dataframe or keep it as additional information.

Special information is 0514: surface hoar and 0530: minimum stabiliy index

Expand source code
def toHDF5(self, filename:str, integrate:bool = False):
        """Writes the data to a HDF5 file
        
        Args:
            filename (str): The name of the HDF5 file to write the data to.
            integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information.
        
        Special information is 0514: surface hoar and 0530: minimum stabiliy index
        """
        with h5py.File(filename, 'w') as file:
            for key, value in self.metadata.items():
                file.attrs[key] = value
            
            for date in self.get_all_dates():
                group = file.create_group(date.strftime('%d.%m.%Y %H:%M:%S'))
                profile = self.get_profile_on(date)
                if profile.weak_layer is not None:
                    group.attrs['weak_layer'] = profile.weak_layer
                if profile.surface_hoar is not None:
                    group.attrs['surface'] = profile.surface_hoar
                df = profile.toDf(integrate=integrate)
                col_names = list(df.columns)
                arr = df.to_numpy()
                group.attrs["fields"] = col_names
                group.create_dataset("data", data=arr)
def update_name_of_code(self, code: str, name: str)

Updates the name of the code

Expand source code
def update_name_of_code(self, code: str, name: str):
    """Updates the name of the code"""
    if code not in self.DataCodes:
        raise ValueError(f"{code} not found. Available codes are: {', '.join(self.DataCodes.keys())}")
    self.DataCodes[code] = name
    self.NamesToCodes[name] = code