Module snowpat.snowpackreader
Expand source code
from .snowpackreader import readPRO, readHDF5, SnowpackReader, readCSV
from .Snowpack import Snowpack
from .json_converter import read_json_profile
__all__ = ['readPRO', 'readHDF5', 'SnowpackReader', 'readCSV', 'Snowpack', 'read_json_profile', 'Snowpack']
Sub-modules
snowpat.snowpackreader.json_convertersnowpat.snowpackreader.snowpackreader
Functions
def readCSV(filename: str)-
Reads the data from a CSV file written by the toCSV method
Args
filename:str- The name of the CSV file to read the data from.
Returns
dict- The metadata.
dict- A dictionary where each date contains a dataframe with keys: data, weak_layer, surface_hoar. data is the dataframe for the respective date, and weak_layer and surface_hoar contain (if available) numpy arrays with the respective information
Expand source code
def readCSV(filename:str): """Reads the data from a CSV file written by the toCSV method Args: filename (str): The name of the CSV file to read the data from. Returns: dict: The metadata. dict: A dictionary where each date contains a dataframe with keys: data, weak_layer, surface_hoar. data is the dataframe for the respective date, and weak_layer and surface_hoar contain (if available) numpy arrays with the respective information """ metadata = {} dataframes = {} df_start_lines = [0] df_end_lines = [] counter = 0 with open(filename, 'r') as file: lines = file.readlines() date = None section = "" for line_id, line in enumerate(lines): if "METADATA" in line: section = "metadata" continue elif "DATA" in line: section = "data" continue if line.startswith('#') and section == "metadata": key, value = line[2:].strip().split(' = ') metadata[key] = value elif line.startswith('#') and section == "data": if "Date" in line: date = datetime.datetime.strptime(line.split('=')[1].strip(), '%d.%m.%Y %H:%M:%S') dataframes[date] = {"data": []} counter = line_id df_end_lines.append(line_id) elif "weak layer" in line: dataframes[date]["weak_layer"] = np.array(line.split(',')[2:], dtype=float) counter = line_id elif "surface hoar" in line: dataframes[date]["surface_hoar"] = np.array(line.split(',')[2:],dtype=float) counter = line_id else: if counter not in df_start_lines: df_start_lines.append(counter) df_start_lines.pop(0) df_end_lines.pop(0) for id, date in enumerate(dataframes.keys()): dataframes[date]["data"] = pd.read_csv(filename, skiprows=df_start_lines[id], nrows=df_end_lines[id]-df_start_lines[id]-1) return metadata, dataframes def readHDF5(filename: str)-
Reads the data from a HDF5 file
Args
filename:str- The name of the HDF5 file to read the data from.
Returns
dict- The metadata.
dict- A dictionary where each date contains a dataframe with the profile.
Expand source code
def readHDF5(filename:str): """Reads the data from a HDF5 file Args: filename (str): The name of the HDF5 file to read the data from. Returns: dict: The metadata. dict: A dictionary where each date contains a dataframe with the profile. """ metadata = {} data = {} with h5py.File(filename, 'r') as file: # Read metadata for key, value in file.attrs.items(): metadata[key] = value # Read data for date in file.keys(): group = file[date] weak_layer = group.attrs.get('weak_layer', None) surface_hoar = group.attrs.get("surface_hoar", None) col_names = group.attrs.get("fields") dat = group.get("data") data[date] = pd.DataFrame(dat[:], columns=col_names) return metadata, data def readPRO(filename: str)-
Reads the snowpack file and returns a SnowpackReader object.
Args
filename:str- The path to the snowpack file.
Returns
SnowpackReader- The SnowpackReader object representing the snowpack file.
Expand source code
def readPRO(filename:str): """ Reads the snowpack file and returns a SnowpackReader object. Args: filename (str): The path to the snowpack file. Returns: SnowpackReader: The SnowpackReader object representing the snowpack file. """ return SnowpackReader(filename) def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) ‑> Tuple[snowpat.snowpackreader.Snowpack.Snowpack, Dict, datetime.datetime]-
Read a snowpack profile from a JSON file and convert it to a Snowpack object.
Args
json_file:str- Path to the JSON file containing the snowpack profile
max_layer_size:Optional[float]- Maximum allowed layer size. Layers exceeding this will be split.
Returns
Tuple[Snowpack, Dict, datetime.datetime]- A tuple containing: - Snowpack object with the profile data - Dictionary with station metadata - Profile date
Expand source code
def read_json_profile(json_file: str, max_layer_size: Optional[float] = None) -> Tuple[Snowpack, Dict, datetime.datetime]: """ Read a snowpack profile from a JSON file and convert it to a Snowpack object. Args: json_file (str): Path to the JSON file containing the snowpack profile max_layer_size (Optional[float]): Maximum allowed layer size. Layers exceeding this will be split. Returns: Tuple[Snowpack, Dict, datetime.datetime]: A tuple containing: - Snowpack object with the profile data - Dictionary with station metadata - Profile date """ with open(json_file, 'r') as f: data = json.load(f) # Extract station metadata metadata = { "StationName": data["name"], "Latitude": data["position"]["latitude"], "Longitude": data["position"]["longitude"], "Altitude": data["position"]["altitude"], "SlopeAngle": data["position"]["angle"], "SlopeAzi": data["position"]["_azimuth"] } # Create Snowpack object snowpack = Snowpack() # Get the first profile (assuming single date) profile = data["profiles"][0] # Get profile date profile_date = datetime.datetime.fromisoformat(profile["date"]) # Process layer data layers = [] # Extract layers from hardness data (which contains the layer structure) hardness_data = profile["hardness"]["elements"][0]["layers"] # Sort layers by bottom height to ensure correct ordering hardness_data.sort(key=lambda x: x["bottom"]) duplicate_layers = [] # Convert layer boundaries to format needed by Snowpack boundaries = [] for layer in hardness_data: if not boundaries: # First layer boundaries.append(layer["bottom"]) boundaries.append(layer["top"]) # Convert hardness values to Newton scale if needed (assuming values > 6 indicate Newton scale) hardness_values = [] is_newton = any(layer["value"] > 6 for layer in hardness_data) snowpack.isNewton = is_newton for layer in hardness_data: hardness_values.append(layer["value"]) n_duplications = [] if max_layer_size is not None: new_boundaries = [] for i in range(len(boundaries) - 1): layer_size = boundaries[i+1] - boundaries[i] if layer_size > max_layer_size: # Calculate splits using numpy for better numerical precision n_splits = math.ceil(layer_size / max_layer_size) split_points = np.linspace(boundaries[i], boundaries[i+1], n_splits + 1, dtype=np.float64) # Add all points except the last one (it will be added in the next iteration) new_boundaries.extend(split_points[:-1]) else: n_splits = 1 new_boundaries.append(boundaries[i]) n_duplications.append(n_splits) # Add final boundary new_boundaries.append(boundaries[-1]) # Convert to numpy arrays boundaries = np.array(new_boundaries, dtype=np.float64) else: boundaries = np.array(boundaries, dtype=np.float64) hardness_values = np.array(hardness_values, dtype=np.float64) if n_duplications: assert len(n_duplications) == len(hardness_values), "Number of duplications does not match number of hardness values" hardness_values = np.repeat(hardness_values, n_duplications) # Set layer boundaries and hardness snowpack.set_param("0501", boundaries, len(boundaries)) snowpack.set_param("0534", hardness_values, len(hardness_values)) # make new fields, so it is clearer, where the layers actually are layer_middle = [ (snowpack.layer_boundaries[i + 1] + snowpack.layer_boundaries[i]) / 2 for i in range(snowpack.layer_boundaries.size - 1) ] layer_thicknes = [ (snowpack.layer_boundaries[i + 1] - snowpack.layer_boundaries[i]) / 2 for i in range(snowpack.layer_boundaries.size - 1) ] layer_middle = np.array(layer_middle) layer_thicknes = np.array(layer_thicknes) snowpack.data["layer middle"] = layer_middle snowpack.data["layer thickness"] = layer_thicknes if len(snowpack._height_mask) > len(layer_middle): snowpack._height_mask = np.delete(snowpack._height_mask, -1) # Process temperature data - the only property that should be interpolated if "temperature" in profile: temp_data = profile["temperature"]["elements"][0]["layers"] temp_values = [] temp_pos = [] for layer in temp_data: temp_values.append(layer["value"]) temp_pos.append(layer["bottom"]) # Convert to numpy arrays for efficient interpolation temp_pos = np.array(temp_pos, dtype=np.float64) temp_values = np.array(temp_values, dtype=np.float64) # Interpolate temperatures to layer midpoints temp_values = np.interp(layer_middle, temp_pos, temp_values) snowpack.set_param("0503", temp_values, len(temp_values)) # Process density data - replicate values for split layers if "density" in profile: density_data = profile["density"]["elements"][0]["layers"] density_top = [] density_bottom = [] density_values = [] for layer in density_data: if "value" in layer and layer["value"] is not None: density_values.append(layer["value"]) density_top.append(layer["top"]) density_bottom.append(layer["bottom"]) else: density_values.append(0.0) # Default value for missing data if density_values: # Convert to numpy array density_values = np.array(density_values, dtype=np.float64) density_top = np.array(density_top, dtype=np.float64) density_bottom = np.array(density_bottom, dtype=np.float64) density_boundaries = np.unique(np.concatenate([density_top, density_bottom])) density_boundaries.sort() if len(density_values) != len(layer_middle) or not all(density_boundaries == snowpack.layer_boundaries): print("Computing Mean densities for layers, provided density layers do not match layer boundaries") new_density_values = np.zeros(len(layer_middle)) last_density_pos = 0 for i in range(len(layer_middle)): midpoint = layer_middle[i] thickness = layer_thicknes[i] layer_bottom = midpoint - thickness / 2 layer_top = midpoint + thickness / 2 # Weighted mean: compute overlap of density intervals with this layer total_weight = 0.0 weighted_sum = 0.0 d_top = density_top[last_density_pos] d_bottom = density_bottom[last_density_pos] if d_top < layer_bottom: last_density_pos = np.where(density_top >= layer_bottom)[0][0] if not (last_density_pos > 0): raise ValueError("Could not match density layers with layer boundaries") d_top = density_top[last_density_pos] d_bottom = density_bottom[last_density_pos] if d_bottom > layer_top: continue while d_bottom < layer_top: layer_overlap = d_top - d_bottom if d_top < layer_top else layer_top - d_bottom total_weight += layer_overlap weighted_sum += layer_overlap * density_values[last_density_pos] d_bottom = d_top if d_bottom > layer_top: break last_density_pos += 1 if last_density_pos >= len(density_values): warnings.warn("Not enough density layers to match with layer boundaries") break d_top = density_top[last_density_pos] d_bottom = density_bottom[last_density_pos] if total_weight > 0: new_density_values[i] = weighted_sum / total_weight else: new_density_values[i] = 0.0 density_values = new_density_values elif n_duplications: assert len(density_values) == len(n_duplications), "Density values and duplications do not match, something went wrong when assigning layer splits" density_values = np.repeat(density_values, n_duplications) snowpack.set_param("0502", density_values, len(density_values)) # Process grain shape data - replicate values for split layers if "grainshape" in profile: grain_data = profile["grainshape"]["elements"][0]["layers"] grain_codes = [] for layer in grain_data: # Convert grain shape codes to Swiss Code format if isinstance(layer["value"], dict): primary = layer["value"]["primary"] secondary = layer["value"].get("secondary", "") tertiary_code = 0 # Special case for MFcr if primary == "MFcr": primary_code = 7 secondary_code = TYPES_TO_CODE.get(secondary, primary_code) tertiary_code = 2 else: primary_code = TYPES_TO_CODE.get(primary, -1) secondary_code = TYPES_TO_CODE.get(secondary, primary_code) if primary_code == -1: primary_code = -9 secondary_code = 9 tertiary_code = 9 code = int(f"{primary_code}{secondary_code}{tertiary_code}") else: code = TYPES_TO_CODE.get(layer["value"], -999) grain_codes.append(code) if grain_codes: # Convert to numpy array grain_codes = np.array(grain_codes, dtype=np.int16) if n_duplications: assert len(grain_codes) == len(n_duplications), "Grain codes and duplications do not match, something went wrong when assigning layer splits" grain_codes = np.repeat(grain_codes, n_duplications) snowpack.set_param("0513", grain_codes, len(grain_codes)) # Process grain size data - replicate values for split layers if "grainsize" in profile: size_data = profile["grainsize"]["elements"][0]["layers"] size_values = [] for layer in size_data: if "value" in layer and layer["value"] is not None: size_values.append(layer["value"]["avg"]) else: size_values.append(0.0) # Default value for missing data if size_values: # Convert to numpy array size_values = np.array(size_values, dtype=np.float64) if n_duplications: assert len(size_values) == len(n_duplications), "Grain sizes and duplications do not match, something went wrong when assigning layer splits" size_values = np.repeat(size_values, n_duplications) snowpack.set_param("0512", size_values, len(size_values)) # Process wetness data - replicate values for split layers if "wetness" in profile: wetness_data = profile["wetness"]["elements"][0]["layers"] wetness_values = [] for layer in wetness_data: if "value" in layer and layer["value"] is not None: wetness_values.append(layer["value"]) else: wetness_values.append(0.0) # Default value for missing data if wetness_values: # Convert to numpy array wetness_values = np.array(wetness_values, dtype=np.float64) if n_duplications: assert len(wetness_values) == len(n_duplications), "Wetness values and duplications do not match, something went wrong when assigning layer splits" wetness_values = np.repeat(wetness_values, n_duplications) snowpack.set_param("0506", wetness_values, len(wetness_values)) # stability index snowpack._parsed = True # Calculate stability indices try: print("Calculating stability indices...") stability_indices = snowpack.calculate_stability_indices() snowpack.weak_layer = stability_indices except Exception as e: warnings.warn(f"Could not calculate stability indices: {str(e)}") return snowpack, metadata, profile_date
Classes
class Snowpack-
Initializes a Snowpack object. Used to handle snow profile data from Snowpack.
Except for obtaining the original height in the .pro file it is advised to only use the getter methods to access the data.
Attributes
layer_boundaries:np.ndarray- The height of the layer boundaries. (Height as written in the Snowpack output)
Methods
get_param(code:str): Returns a parameter from the Snowpack object for the given data code. discard_below_ground(discard:bool): Sets whether to return data below ground level (default: True). toDf(integrate:bool): Converts the Snowpack object to a pandas DataFrame. With either integrated weak layer and surface hoar or as metadata accssible as: df.weak_layer; df.surface_hoar
Args
None
Returns
None
Expand source code
class Snowpack: def __init__(self): """ Initializes a Snowpack object. Used to handle snow profile data from Snowpack. Except for obtaining the original height in the .pro file it is advised to only use the getter methods to access the data. Attributes: layer_boundaries (np.ndarray): The height of the layer boundaries. (Height as written in the Snowpack output) Methods: get_param(code:str): Returns a parameter from the Snowpack object for the given data code. discard_below_ground(discard:bool): Sets whether to return data below ground level (default: True). toDf(integrate:bool): Converts the Snowpack object to a pandas DataFrame. With either integrated weak layer and surface hoar or as metadata accssible as: df.weak_layer; df.surface_hoar Args: None Returns: None """ self.layer_boundaries: Optional[np.ndarray] = None self.data: Dict[str, np.ndarray] = {} self.surface_hoar: Optional[np.ndarray] = None self.weak_layer: Optional[np.ndarray] = None self.isNewton: Optional[bool] = None self.old_hardness: Optional[bool] = None # internal variables self._height_mask: Optional[np.ndarray] = None self._above_ground = False self.num_nodes = None self._parsed = False def set_param(self, code: str, values: np.ndarray, boundaries: int): if not self.num_nodes: self.num_nodes = boundaries if code == "0501": self.layer_boundaries = values self._height_mask = self.layer_boundaries >= 0 elif code == "0514": self.surface_hoar = values elif code == "0530": self.weak_layer = values else: self.data[code] = values def get_param(self, code: str, return_missing: bool = False): """ Retrieves the parameter associated with the given code. Args: code (str): The code of the parameter to retrieve. return_missing (bool): Will return a missing value (-999) instead of showing a warning when a variable is not present Returns: The parameter associated with the given code. Raises: KeyError: If the code is not found in the data. """ # need to add these, because they were handled seperately and can also be called through different means possible_codes = list(self.data.keys()) + ["0501", "0530", "0514"] if not code in possible_codes: if return_missing: return np.full(len(self.data["layer middle"]),-999.0) # The Snowpack Missing Value print(f"{code} is invalid") print("available codes are:") print(f"{self.data.keys()}") return if code == "0501": if self._above_ground: mask = np.append(self._height_mask,True) return self.layer_boundaries[mask] else: return self.layer_boundaries if code == "0514": return self.surface_hoar if code == "0530": return self.weak_layer param = self.data[code] if self._above_ground: param = param[self._height_mask] return param def discard_below_ground(self, discard: bool): """ Sets whether to return data below ground level. If set to true, only data above ground will be returned, by the getter methods. Otherwise all the data will be returned. Can be used subsequently. Args: discard (bool): If True, data below ground level will be discarded. If False, all data will be kept. Returns: None """ self._above_ground = discard def _parse_data(self, old_hardness:bool): # snowpack sometimes does not explicitly put a boundary at 0, so we need to append that if self.layer_boundaries[0] > 0: self.num_nodes += 1 self.layer_boundaries = np.insert(self.layer_boundaries, 0, 0) self._height_mask = np.insert(self._height_mask, 0, True) # nodes give the boundaries, but values are valid for the whole layer n_layers = self.num_nodes -1 for key, val in self.data.items(): # grain types has surface hoar as 0, and is specified with a dfferent code if key == "0513" and len(val) == n_layers + 1: self.data[key] = np.delete(val, -1) # fill missing layers with nans if self.data[key].size != n_layers: try: self.data[key] = np.insert( self.data[key], 0, [np.nan for _ in range(n_layers - self.data[key].size)] ) except: print(n_layers) print(self.data[key].size) print(f"{key} has {self.data[key].size} values, but {n_layers} layers") # make new fields, so it is clearer, where the layers actually are layer_middle = [ (self.layer_boundaries[i + 1] + self.layer_boundaries[i]) / 2 for i in range(self.layer_boundaries.size - 1) ] layer_thicknes = [ (self.layer_boundaries[i + 1] - self.layer_boundaries[i]) / 2 for i in range(self.layer_boundaries.size - 1) ] layer_middle = np.array(layer_middle) layer_thicknes = np.array(layer_thicknes) self.data["layer middle"] = layer_middle self.data["layer thickness"] = layer_thicknes if len(self._height_mask) > n_layers: self._height_mask = np.delete(self._height_mask, -1) # check how the hardness is specified if "0534" in self.data.keys(): if old_hardness: self.isNewton = all(self.data["0534"] > 0) self.old_hardness = True else: self.isNewton = any(self.data["0534"] > 6) self.old_hardness = False else: self.old_hardness = True def toDf(self, CodesToName: Dict = None, integrate: bool = False): """ Converts the Snowpack object to a pandas DataFrame. In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries, are converted to the middle of the layers and the thickness of the layers, for a clean data frame. The original layer boundaries can easily be computed from that. The minimum stability indices (weak_layer) and surface hoar information are available as: df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging... the dataframe as pandas does not handle this yet. Args: CodesToName (Dict, optional): A dictionary mapping column data codes to column names. Returns: DataFrame: The Snowpack data as a pandas DataFrame. """ df = pd.DataFrame(self.data) cols = ( ["layer middle"] + ["layer thickness"] + [ col for col in df.columns if col != "layer middle" and col != "layer thickness" ] ) df = df[cols] if self._above_ground: df = df[self._height_mask] if CodesToName: df.rename(columns=CodesToName, inplace=True) if integrate: df.weak_layer = None df.surface_hoar = None if self.surface_hoar is not None: df["surface hoar"] = [ np.nan for _ in range(df["layer middle"].size - 1) ] + [self.surface_hoar] if self.weak_layer is not None: df["weak layer"] = [self.weak_layer] + [ np.nan for _ in range(df["layer middle"].size - 1) ] else: warnings.filterwarnings('ignore', 'Pandas doesn\'t allow columns to be created via a new attribute name') df.weak_layer = self.weak_layer df.surface_hoar = self.surface_hoar return df def write_pro_file(self, filepath: str, station_params: dict = None, profile_date: datetime = None): """ Writes the Snowpack data to a .pro file. Args: filepath (str): Path to save the .pro file station_params (dict, optional): Dictionary containing station parameters. Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi If not provided, default values will be used. Returns: None """ if not self._parsed: raise ValueError("Snowpack data has not been parsed yet.") # Default station parameters if not provided default_params = { "StationName": "Unknown", "Latitude": 0.0, "Longitude": 0.0, "Altitude": 0.0, "SlopeAngle": 0, "SlopeAzi": 0 } station_params = station_params or default_params with open(filepath, 'w') as f: # Write station parameters f.write("[STATION_PARAMETERS]\n") for key, value in station_params.items(): if key == "SlopeAzi" and value == None: value = 0 f.write(f"{key}= {value}\n") f.write("\n") # Write header section f.write("[HEADER]\n") f.write("0500,Date\n") f.write("0501,nElems,height [> 0: top, < 0: bottom of elem.] (cm)\n") f.write("0502,nElems,element density (kg m-3)\n") f.write("0503,nElems,element temperature (degC)\n") f.write("0504,nElems,element ID (1)\n") f.write("0506,nElems,liquid water content by volume (%)\n") f.write("0508,nElems,dendricity (1)\n") f.write("0509,nElems,sphericity (1)\n") f.write("0510,nElems,coordination number (1)\n") f.write("0511,nElems,bond size (mm)\n") f.write("0512,nElems,grain size (mm)\n") f.write("0513,nElems,grain type (Swiss Code F1F2F3)\n") f.write("0514,3,grain type, grain size (mm), and density (kg m-3) of SH at surface\n") f.write("0515,nElems,ice volume fraction (%)\n") f.write("0516,nElems,air volume fraction (%)\n") f.write("0517,nElems,stress in (kPa)\n") f.write("0518,nElems,viscosity (GPa s)\n") f.write("0519,nElems,soil volume fraction (%)\n") f.write("0520,nElems,temperature gradient (K m-1)\n") f.write("0521,nElems,thermal conductivity (W K-1 m-1)\n") f.write("0522,nElems,absorbed shortwave radiation (W m-2)\n") f.write("0523,nElems,viscous deformation rate (1.e-6 s-1)\n") f.write("0530,8,position (cm) and minimum stability indices:\n") f.write("profile type, stability class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38\n") f.write("0531,nElems,deformation rate stability index Sdef\n") f.write("0532,nElems,natural stability index Sn38\n") f.write("0533,nElems,stability index Sk38\n") f.write("0534,nElems,hand hardness either (N) or index steps (1)\n") f.write("0535,nElems,optical equivalent grain size (mm)\n") f.write("0601,nElems,snow shear strength (kPa)\n") f.write("0602,nElems,grain size difference (mm)\n") f.write("0603,nElems,hardness difference (1)\n") f.write("0604,nElems,ssi\n") f.write("0605,nElems,inverse texture index ITI (Mg m-4)\n") f.write("0606,nElems,critical cut length (m)\n") f.write("\n") # Write data section f.write("[DATA]\n") # Write date current_date = profile_date.strftime("%d.%m.%Y %H:%M:%S") f.write(f"0500,{current_date}\n") # Write layer boundaries boundaries = self.get_param("0501") f.write(f"0501,{len(boundaries)},{','.join(map(str, boundaries))}\n") # Write all other parameters param_codes = [code for code in self.data.keys() if code not in ["layer middle", "layer thickness"]] for code in sorted(param_codes): values = self.get_param(code) if values is not None: if code == "0534" and not self.isNewton: # snowpack only gives 0.5 and hand hardness is negative for (i,value) in enumerate(values): if math.isclose(value%1, 1/3) or math.isclose(value%1, 2/3): value = math.floor(value) + 0.5 values[i] = -1 * value if code == "0513": values = np.append(values, -999) # for some reason we need surface here if np.any(values <100) and np.any(values >0): out_vals = [f"{val}" if val > 100 or val < 0 else f"000" for val in values] values = out_vals f.write(f"{code},{len(values)},{','.join(map(str, values))}\n") # Write special codes (surface hoar and weak layer) if self.surface_hoar is not None: f.write(f"0514,3,{','.join(map(str, self.surface_hoar))}\n") if self.weak_layer is not None: f.write(f"0530,8,{','.join(map(str, self.weak_layer))}\n") def calculate_stability_indices(self) -> np.ndarray: """ Calculate stability indices (SSI, Sk38, Sn38) for the snowpack. Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38] Based on: - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis """ if not self._parsed: raise ValueError("Data must be parsed before calculating stability indices") # Get required layer properties heights = self.get_param("layer middle") # cm thicknesses = self.get_param("layer thickness") # cm densities = self.get_param("0502") # kg/m³ temps = self.get_param("0503") # °C grain_types = self.get_param("0513") # Code grain_sizes = self.get_param("0512") # mm hardness = self.get_param("0534") # N if isNewton else hand hardness index g = 9.81 # m/s² skier_load = 1500 # N - represents typical skier load # Initialize arrays n_layers = len(heights) # use primary grain type codes (0-9) from swiss F1F2F3 codes primary_gt = np.zeros(n_layers, dtype=np.int16) for (i,gt) in enumerate(grain_types): if gt == -999: primary_gt[i] = -999 elif gt <100 and gt >0: primary_gt[i] = 0 else: primary_gt[i] = int(gt // 100) stress = np.zeros(n_layers) strength = np.zeros(n_layers) Sn38 = np.zeros(n_layers) Sk38 = np.zeros(n_layers) # placeholder for SSI between layers ssi = np.zeros(n_layers - 1) # Calculate penetration depth Pk (m) - C++ compPenetrationDepth depth_m = np.sum(thicknesses) / 100.0 rho_Pk_num = 0.0 dz_Pk = 0.0 cum_depth = 0.0 for j in range(n_layers-1, -1, -1): if cum_depth >= 30.0: break Lm = thicknesses[j] / 100.0 rho_Pk_num += densities[j] * Lm dz_Pk += Lm cum_depth += thicknesses[j] if dz_Pk > 0.0: rho_Pk = rho_Pk_num / dz_Pk Pk = min(0.8 * 43.3 / rho_Pk, depth_m) else: Pk = 0.0 # Precompute reference slope angle parameters (psi_ref = 38°) psi_ref = 38.0 cos_psi = math.cos(math.radians(psi_ref)) sin_psi = math.sin(math.radians(psi_ref)) ski_load = 85.0 * g / 1.7 # Calculate stress, strength, Sn38 and Sk38 using reduced stresses for i in range(n_layers): # overburden stress (Pa) if densities[i] > 0: stress[i] = np.sum(densities[:i+1] * thicknesses[:i+1]) * g / 100.0 # layer strength (shear strength Sig_c2) if self.isNewton: strength[i] = hardness[i] else: strength[i] = 2.91 * np.exp(2.91 * hardness[i]) if temps[i] is not None: tf = max(0.5, 1.0 - 0.01 * abs(temps[i])) strength[i] *= tf if primary_gt[i] in [5, 6]: strength[i] *= 0.7 # reduced stresses sig_n = -stress[i] / 1000.0 sig_s = sig_n * sin_psi / cos_psi # natural stability index (getNaturalStability) if sig_s > 0.0: Sn38[i] = max(0.05, min(6.0, strength[i] / sig_s)) else: Sn38[i] = 6.0 # skier stability index (getLayerSkierStability) depth_lay = np.sum(thicknesses[:i+1]) / 100.0 layer_depth = depth_lay - Pk if layer_depth > 1e-6: delta_sig = 2.0 * ski_load * math.cos(math.radians(psi_ref)) * (math.sin(math.radians(psi_ref))**2) * math.sin(math.radians(2*psi_ref)) delta_sig /= math.pi * layer_depth * cos_psi delta_sig /= 1000.0 Sk38[i] = max(0.05, min(6.0, strength[i] / (sig_s + delta_sig))) else: Sk38[i] = 6.0 # Compute SSI and record lemon counts per interface (C++ initStructuralStabilityIndex) max_stab = 6.0 nmax_lemon = 2 lemons_arr = np.zeros(n_layers - 1, dtype=int) for i in range(n_layers - 1): lemons = 0 if abs(hardness[i] - hardness[i+1]) > 1.5: lemons += 1 if grain_sizes is not None and 2 * abs(grain_sizes[i] - grain_sizes[i+1]) > 0.5: lemons += 1 lemons_arr[i] = lemons val = nmax_lemon - lemons + Sk38[i+1] ssi[i] = max(0.05, min(max_stab, val)) # Determine weak-layer location and values min_Sn38_idx = np.argmin(Sn38) min_Sk38_idx = np.argmin(Sk38) min_ssi_idx = np.argmin(ssi) Swl_ssi = ssi[min_ssi_idx] Swl_lemon = int(lemons_arr[min_ssi_idx]) Swl_Sk38 = Sk38[min_ssi_idx+1] # Profile type: apply surface hoar flag, fallback on error if not (Swl_ssi > 0 and Swl_ssi < 100): profile_type = -1 else: profile_type = 2 if np.any(primary_gt == 6) else 1 # Stability class via SchweizerBellaire2 if Swl_ssi > 0 and Swl_ssi < 100: if Swl_lemon >= 2: stability_class = 1 elif Swl_lemon == 1: if Swl_Sk38 < 0.48: stability_class = 1 elif Swl_Sk38 < 0.71: stability_class = 3 else: stability_class = 5 else: stability_class = 3 else: stability_class = -1 # Format output according to 0530 code spec result = np.array([ profile_type, stability_class, heights[min_ssi_idx], # z_Sdef ssi[min_ssi_idx], # Sdef (SSI) heights[min_Sn38_idx], # z_Sn38 Sn38[min_Sn38_idx], # Sn38 heights[min_Sk38_idx], # z_Sk38 Sk38[min_Sk38_idx] # Sk38 ]) # Store result in weak_layer property self.weak_layer = result self.set_param("0530", result, len(result)) self.set_param("0604", ssi, len(ssi)) return resultMethods
def calculate_stability_indices(self) ‑> numpy.ndarray-
Calculate stability indices (SSI, Sk38, Sn38) for the snowpack. Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38]
Based on: - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis
Expand source code
def calculate_stability_indices(self) -> np.ndarray: """ Calculate stability indices (SSI, Sk38, Sn38) for the snowpack. Returns array with format: [profile_type, stability_class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38] Based on: - Schweizer et al. (2006) - A threshold sum approach to stability evaluation of manual profiles - Monti et al. (2016) - Snow instability evaluation: calculating the skier-induced stress in a multi-layered snowpack - Schweizer and Jamieson (2003) - Snowpack properties for snow profile analysis """ if not self._parsed: raise ValueError("Data must be parsed before calculating stability indices") # Get required layer properties heights = self.get_param("layer middle") # cm thicknesses = self.get_param("layer thickness") # cm densities = self.get_param("0502") # kg/m³ temps = self.get_param("0503") # °C grain_types = self.get_param("0513") # Code grain_sizes = self.get_param("0512") # mm hardness = self.get_param("0534") # N if isNewton else hand hardness index g = 9.81 # m/s² skier_load = 1500 # N - represents typical skier load # Initialize arrays n_layers = len(heights) # use primary grain type codes (0-9) from swiss F1F2F3 codes primary_gt = np.zeros(n_layers, dtype=np.int16) for (i,gt) in enumerate(grain_types): if gt == -999: primary_gt[i] = -999 elif gt <100 and gt >0: primary_gt[i] = 0 else: primary_gt[i] = int(gt // 100) stress = np.zeros(n_layers) strength = np.zeros(n_layers) Sn38 = np.zeros(n_layers) Sk38 = np.zeros(n_layers) # placeholder for SSI between layers ssi = np.zeros(n_layers - 1) # Calculate penetration depth Pk (m) - C++ compPenetrationDepth depth_m = np.sum(thicknesses) / 100.0 rho_Pk_num = 0.0 dz_Pk = 0.0 cum_depth = 0.0 for j in range(n_layers-1, -1, -1): if cum_depth >= 30.0: break Lm = thicknesses[j] / 100.0 rho_Pk_num += densities[j] * Lm dz_Pk += Lm cum_depth += thicknesses[j] if dz_Pk > 0.0: rho_Pk = rho_Pk_num / dz_Pk Pk = min(0.8 * 43.3 / rho_Pk, depth_m) else: Pk = 0.0 # Precompute reference slope angle parameters (psi_ref = 38°) psi_ref = 38.0 cos_psi = math.cos(math.radians(psi_ref)) sin_psi = math.sin(math.radians(psi_ref)) ski_load = 85.0 * g / 1.7 # Calculate stress, strength, Sn38 and Sk38 using reduced stresses for i in range(n_layers): # overburden stress (Pa) if densities[i] > 0: stress[i] = np.sum(densities[:i+1] * thicknesses[:i+1]) * g / 100.0 # layer strength (shear strength Sig_c2) if self.isNewton: strength[i] = hardness[i] else: strength[i] = 2.91 * np.exp(2.91 * hardness[i]) if temps[i] is not None: tf = max(0.5, 1.0 - 0.01 * abs(temps[i])) strength[i] *= tf if primary_gt[i] in [5, 6]: strength[i] *= 0.7 # reduced stresses sig_n = -stress[i] / 1000.0 sig_s = sig_n * sin_psi / cos_psi # natural stability index (getNaturalStability) if sig_s > 0.0: Sn38[i] = max(0.05, min(6.0, strength[i] / sig_s)) else: Sn38[i] = 6.0 # skier stability index (getLayerSkierStability) depth_lay = np.sum(thicknesses[:i+1]) / 100.0 layer_depth = depth_lay - Pk if layer_depth > 1e-6: delta_sig = 2.0 * ski_load * math.cos(math.radians(psi_ref)) * (math.sin(math.radians(psi_ref))**2) * math.sin(math.radians(2*psi_ref)) delta_sig /= math.pi * layer_depth * cos_psi delta_sig /= 1000.0 Sk38[i] = max(0.05, min(6.0, strength[i] / (sig_s + delta_sig))) else: Sk38[i] = 6.0 # Compute SSI and record lemon counts per interface (C++ initStructuralStabilityIndex) max_stab = 6.0 nmax_lemon = 2 lemons_arr = np.zeros(n_layers - 1, dtype=int) for i in range(n_layers - 1): lemons = 0 if abs(hardness[i] - hardness[i+1]) > 1.5: lemons += 1 if grain_sizes is not None and 2 * abs(grain_sizes[i] - grain_sizes[i+1]) > 0.5: lemons += 1 lemons_arr[i] = lemons val = nmax_lemon - lemons + Sk38[i+1] ssi[i] = max(0.05, min(max_stab, val)) # Determine weak-layer location and values min_Sn38_idx = np.argmin(Sn38) min_Sk38_idx = np.argmin(Sk38) min_ssi_idx = np.argmin(ssi) Swl_ssi = ssi[min_ssi_idx] Swl_lemon = int(lemons_arr[min_ssi_idx]) Swl_Sk38 = Sk38[min_ssi_idx+1] # Profile type: apply surface hoar flag, fallback on error if not (Swl_ssi > 0 and Swl_ssi < 100): profile_type = -1 else: profile_type = 2 if np.any(primary_gt == 6) else 1 # Stability class via SchweizerBellaire2 if Swl_ssi > 0 and Swl_ssi < 100: if Swl_lemon >= 2: stability_class = 1 elif Swl_lemon == 1: if Swl_Sk38 < 0.48: stability_class = 1 elif Swl_Sk38 < 0.71: stability_class = 3 else: stability_class = 5 else: stability_class = 3 else: stability_class = -1 # Format output according to 0530 code spec result = np.array([ profile_type, stability_class, heights[min_ssi_idx], # z_Sdef ssi[min_ssi_idx], # Sdef (SSI) heights[min_Sn38_idx], # z_Sn38 Sn38[min_Sn38_idx], # Sn38 heights[min_Sk38_idx], # z_Sk38 Sk38[min_Sk38_idx] # Sk38 ]) # Store result in weak_layer property self.weak_layer = result self.set_param("0530", result, len(result)) self.set_param("0604", ssi, len(ssi)) return result def discard_below_ground(self, discard: bool)-
Sets whether to return data below ground level.
If set to true, only data above ground will be returned, by the getter methods. Otherwise all the data will be returned.
Can be used subsequently.
Args
discard:bool- If True, data below ground level will be discarded. If False, all data will be kept.
Returns
None
Expand source code
def discard_below_ground(self, discard: bool): """ Sets whether to return data below ground level. If set to true, only data above ground will be returned, by the getter methods. Otherwise all the data will be returned. Can be used subsequently. Args: discard (bool): If True, data below ground level will be discarded. If False, all data will be kept. Returns: None """ self._above_ground = discard def get_param(self, code: str, return_missing: bool = False)-
Retrieves the parameter associated with the given code.
Args
code:str- The code of the parameter to retrieve.
return_missing:bool- Will return a missing value (-999) instead of showing a warning when a variable is not present
Returns
The parameter associated with the given code.
Raises
KeyError- If the code is not found in the data.
Expand source code
def get_param(self, code: str, return_missing: bool = False): """ Retrieves the parameter associated with the given code. Args: code (str): The code of the parameter to retrieve. return_missing (bool): Will return a missing value (-999) instead of showing a warning when a variable is not present Returns: The parameter associated with the given code. Raises: KeyError: If the code is not found in the data. """ # need to add these, because they were handled seperately and can also be called through different means possible_codes = list(self.data.keys()) + ["0501", "0530", "0514"] if not code in possible_codes: if return_missing: return np.full(len(self.data["layer middle"]),-999.0) # The Snowpack Missing Value print(f"{code} is invalid") print("available codes are:") print(f"{self.data.keys()}") return if code == "0501": if self._above_ground: mask = np.append(self._height_mask,True) return self.layer_boundaries[mask] else: return self.layer_boundaries if code == "0514": return self.surface_hoar if code == "0530": return self.weak_layer param = self.data[code] if self._above_ground: param = param[self._height_mask] return param def set_param(self, code: str, values: numpy.ndarray, boundaries: int)-
Expand source code
def set_param(self, code: str, values: np.ndarray, boundaries: int): if not self.num_nodes: self.num_nodes = boundaries if code == "0501": self.layer_boundaries = values self._height_mask = self.layer_boundaries >= 0 elif code == "0514": self.surface_hoar = values elif code == "0530": self.weak_layer = values else: self.data[code] = values def toDf(self, CodesToName: Dict = None, integrate: bool = False)-
Converts the Snowpack object to a pandas DataFrame.
In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries, are converted to the middle of the layers and the thickness of the layers, for a clean data frame. The original layer boundaries can easily be computed from that.
The minimum stability indices (weak_layer) and surface hoar information are available as: df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging… the dataframe as pandas does not handle this yet.
Args
CodesToName:Dict, optional- A dictionary mapping column data codes to column names.
Returns
DataFrame- The Snowpack data as a pandas DataFrame.
Expand source code
def toDf(self, CodesToName: Dict = None, integrate: bool = False): """ Converts the Snowpack object to a pandas DataFrame. In the data frame the heights given in the Snowpack output, which essentially are the layer boundaries, are converted to the middle of the layers and the thickness of the layers, for a clean data frame. The original layer boundaries can easily be computed from that. The minimum stability indices (weak_layer) and surface hoar information are available as: df.weak_layer and df.surface_hoar. However, this information will not be passed on when merging... the dataframe as pandas does not handle this yet. Args: CodesToName (Dict, optional): A dictionary mapping column data codes to column names. Returns: DataFrame: The Snowpack data as a pandas DataFrame. """ df = pd.DataFrame(self.data) cols = ( ["layer middle"] + ["layer thickness"] + [ col for col in df.columns if col != "layer middle" and col != "layer thickness" ] ) df = df[cols] if self._above_ground: df = df[self._height_mask] if CodesToName: df.rename(columns=CodesToName, inplace=True) if integrate: df.weak_layer = None df.surface_hoar = None if self.surface_hoar is not None: df["surface hoar"] = [ np.nan for _ in range(df["layer middle"].size - 1) ] + [self.surface_hoar] if self.weak_layer is not None: df["weak layer"] = [self.weak_layer] + [ np.nan for _ in range(df["layer middle"].size - 1) ] else: warnings.filterwarnings('ignore', 'Pandas doesn\'t allow columns to be created via a new attribute name') df.weak_layer = self.weak_layer df.surface_hoar = self.surface_hoar return df def write_pro_file(self, filepath: str, station_params: dict = None, profile_date:= None) -
Writes the Snowpack data to a .pro file.
Args
filepath:str- Path to save the .pro file
station_params:dict, optional- Dictionary containing station parameters. Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi If not provided, default values will be used.
Returns
None
Expand source code
def write_pro_file(self, filepath: str, station_params: dict = None, profile_date: datetime = None): """ Writes the Snowpack data to a .pro file. Args: filepath (str): Path to save the .pro file station_params (dict, optional): Dictionary containing station parameters. Keys should be: StationName, Latitude, Longitude, Altitude, SlopeAngle, SlopeAzi If not provided, default values will be used. Returns: None """ if not self._parsed: raise ValueError("Snowpack data has not been parsed yet.") # Default station parameters if not provided default_params = { "StationName": "Unknown", "Latitude": 0.0, "Longitude": 0.0, "Altitude": 0.0, "SlopeAngle": 0, "SlopeAzi": 0 } station_params = station_params or default_params with open(filepath, 'w') as f: # Write station parameters f.write("[STATION_PARAMETERS]\n") for key, value in station_params.items(): if key == "SlopeAzi" and value == None: value = 0 f.write(f"{key}= {value}\n") f.write("\n") # Write header section f.write("[HEADER]\n") f.write("0500,Date\n") f.write("0501,nElems,height [> 0: top, < 0: bottom of elem.] (cm)\n") f.write("0502,nElems,element density (kg m-3)\n") f.write("0503,nElems,element temperature (degC)\n") f.write("0504,nElems,element ID (1)\n") f.write("0506,nElems,liquid water content by volume (%)\n") f.write("0508,nElems,dendricity (1)\n") f.write("0509,nElems,sphericity (1)\n") f.write("0510,nElems,coordination number (1)\n") f.write("0511,nElems,bond size (mm)\n") f.write("0512,nElems,grain size (mm)\n") f.write("0513,nElems,grain type (Swiss Code F1F2F3)\n") f.write("0514,3,grain type, grain size (mm), and density (kg m-3) of SH at surface\n") f.write("0515,nElems,ice volume fraction (%)\n") f.write("0516,nElems,air volume fraction (%)\n") f.write("0517,nElems,stress in (kPa)\n") f.write("0518,nElems,viscosity (GPa s)\n") f.write("0519,nElems,soil volume fraction (%)\n") f.write("0520,nElems,temperature gradient (K m-1)\n") f.write("0521,nElems,thermal conductivity (W K-1 m-1)\n") f.write("0522,nElems,absorbed shortwave radiation (W m-2)\n") f.write("0523,nElems,viscous deformation rate (1.e-6 s-1)\n") f.write("0530,8,position (cm) and minimum stability indices:\n") f.write("profile type, stability class, z_Sdef, Sdef, z_Sn38, Sn38, z_Sk38, Sk38\n") f.write("0531,nElems,deformation rate stability index Sdef\n") f.write("0532,nElems,natural stability index Sn38\n") f.write("0533,nElems,stability index Sk38\n") f.write("0534,nElems,hand hardness either (N) or index steps (1)\n") f.write("0535,nElems,optical equivalent grain size (mm)\n") f.write("0601,nElems,snow shear strength (kPa)\n") f.write("0602,nElems,grain size difference (mm)\n") f.write("0603,nElems,hardness difference (1)\n") f.write("0604,nElems,ssi\n") f.write("0605,nElems,inverse texture index ITI (Mg m-4)\n") f.write("0606,nElems,critical cut length (m)\n") f.write("\n") # Write data section f.write("[DATA]\n") # Write date current_date = profile_date.strftime("%d.%m.%Y %H:%M:%S") f.write(f"0500,{current_date}\n") # Write layer boundaries boundaries = self.get_param("0501") f.write(f"0501,{len(boundaries)},{','.join(map(str, boundaries))}\n") # Write all other parameters param_codes = [code for code in self.data.keys() if code not in ["layer middle", "layer thickness"]] for code in sorted(param_codes): values = self.get_param(code) if values is not None: if code == "0534" and not self.isNewton: # snowpack only gives 0.5 and hand hardness is negative for (i,value) in enumerate(values): if math.isclose(value%1, 1/3) or math.isclose(value%1, 2/3): value = math.floor(value) + 0.5 values[i] = -1 * value if code == "0513": values = np.append(values, -999) # for some reason we need surface here if np.any(values <100) and np.any(values >0): out_vals = [f"{val}" if val > 100 or val < 0 else f"000" for val in values] values = out_vals f.write(f"{code},{len(values)},{','.join(map(str, values))}\n") # Write special codes (surface hoar and weak layer) if self.surface_hoar is not None: f.write(f"0514,3,{','.join(map(str, self.surface_hoar))}\n") if self.weak_layer is not None: f.write(f"0530,8,{','.join(map(str, self.weak_layer))}\n")
class SnowpackReader (filename: str)-
A class for reading and parsing Snowpack files.
Initializes a SnowpackReader object.
The data and units are saved with the data codes as keys.
So to get a data column for a parameter use data[date][code] to get the data at a specific date and units[code] to get the unit.
Conversion from Codes to names and vice versa is available through the DataCodes and NamesToCodes dictionaries.
Args
filename:str- The path to the Snowpack file.
meaningful_names:bool, optional- If True, the data codes will be replaced with meaningful names given in the header, in the returned dataframe
Attributes
filename:str- The path to the Snowpack file.
metadata:dict, optional- A dictionary containing metadata information. Initialized as None.
DataCodes:dict- A dictionary containing default data codes. Initialized with get_default_codes().
data:dict- A dictionary to store the data read from the Snowpack file, each date is a key, and profiles are safed as Snowpack objects
units:dict- A dictionary to store the units of the data.
It is advised to use the Classes getter methods to get the data:
Methods
get_profile_on(date: datetime.datetime) -> Snowpack: Returns the data for the given date. get_all_profiles() -> List[Snowpack]: Returns all the profiles. get_var(code:str) -> List[np.ndarray]: Returns the data for the given code. CodeToName(code:str) -> str: Returns the name of the code.
If you do not want to have data below ground level, use the discard_below_ground method.
Methods
discard_below_ground(discard: bool): Getter and writers only use the above ground data.
To write to another file format use the toCSV and toHDF5 methods. It is advised to use the HDF5 format as it is much more robust Methods: toCSV(filename:str, integrate:bool=False): Writes the data to a CSV file. toHDF5(filename:str, integrate:bool=False): Writes the data to a HDF5 file.
Reader Methods to read the data from a file written by this class: readHDF5(filename:str) -> dict, dict: Reads the data from a HDF5 file. fromCSV(filename:str) -> dict, dict: Reads the data from a CSV file written by the toCSV method.
Expand source code
class SnowpackReader: """A class for reading and parsing Snowpack files.""" def __init__(self, filename: str): """ Initializes a SnowpackReader object. The data and units are saved with the data codes as keys. So to get a data column for a parameter use data[date][code] to get the data at a specific date and units[code] to get the unit. Conversion from Codes to names and vice versa is available through the DataCodes and NamesToCodes dictionaries. Args: filename (str): The path to the Snowpack file. meaningful_names (bool, optional): If True, the data codes will be replaced with meaningful names given in the header, in the returned dataframe Attributes: filename (str): The path to the Snowpack file. metadata (dict, optional): A dictionary containing metadata information. Initialized as None. DataCodes (dict): A dictionary containing default data codes. Initialized with get_default_codes(). data (dict): A dictionary to store the data read from the Snowpack file, each date is a key, and profiles are safed as Snowpack objects units (dict): A dictionary to store the units of the data. It is advised to use the Classes getter methods to get the data: Methods: get_profile_on(date: datetime.datetime) -> Snowpack: Returns the data for the given date. get_all_profiles() -> List[Snowpack]: Returns all the profiles. get_var(code:str) -> List[np.ndarray]: Returns the data for the given code. CodeToName(code:str) -> str: Returns the name of the code. If you do not want to have data below ground level, use the discard_below_ground method. Methods: discard_below_ground(discard: bool): Getter and writers only use the above ground data. To write to another file format use the toCSV and toHDF5 methods. It is advised to use the HDF5 format as it is much more robust Methods: toCSV(filename:str, integrate:bool=False): Writes the data to a CSV file. toHDF5(filename:str, integrate:bool=False): Writes the data to a HDF5 file. Reader Methods to read the data from a file written by this class: readHDF5(filename:str) -> dict, dict: Reads the data from a HDF5 file. fromCSV(filename:str) -> dict, dict: Reads the data from a CSV file written by the toCSV method. """ self.filename = filename # file content self.metadata = {} self.DataCodes = {} self.units = {} self.data:Dict[datetime.datetime,Snowpack] = {} # internal help variables self.current_date = None self._old_hardness = False self._read_file() self._clear_mapping() self._parse_profiles() def _read_file(self): """Reads and parses the Snowpack file""" try: with open(self.filename, 'r') as file: self._parse_file(file) except (FileNotFoundError, PermissionError) as e: raise Exception(f"Error opening file: {e}") # Handle error appropriately def _parse_file(self, file): """ Parses the Snowpack file line by line. Args: file (file object): The file object to parse. """ current_section = None for line in file: line = line.strip() if not line or line.startswith('#'): # Skip empty or comment lines continue current_section, new_section = self._determine_section(line, current_section) if new_section: continue self._parse_sections(line, current_section) def _determine_section(self, line: str, current_section: str) -> str: """ Determines the current section based on the line content. Args: line (str): The line to analyze. current_section (str): The current section. Returns: str: The updated current section. """ if "STATION_PARAMETERS" in line: return 'metadata', True elif "HEADER" in line: return 'header', True elif "DATA" in line: return 'data', True return current_section, False def _parse_header(self, line:str, current_section:str): """ Parses the header sections [STATION_PARAMETERS] and [HEADER]. Parameters: - line (str): The line to parse. - current_section (str): The current section. """ if current_section == "metadata": key, value = line.split('=') self.metadata[key.strip()] = value.strip() elif current_section == 'header': line_vals = line.split(',') if line_vals[0] == '0500' and line_vals[1] == 'Date': self.DataCodes["0500"] = "Date" elif line_vals[0] == '0530': self.DataCodes["0530"] = "minimum stability indices (cm)" elif line_vals[0] == '0514': self.DataCodes["0514"] = "surface hoar ([Swiss Code, mm, kgm-3])" elif line_vals[0] == '0534': self._old_hardness = line_vals[2] == OLD_HEADER else: if len(line_vals) >= 3: self.DataCodes[line_vals[0]] = ",".join(line_vals[2:]) else: self.DataCodes[line_vals[0]] = line_vals[2] def _parse_data_section(self, line:str): """Parses the [DATA] section""" vals = [val.strip() for val in line.split(',')] code = vals[0] if len(code)!=4: raise ValueError("Invalid data code: {}".format(code)); if vals[1] == "nElems" or vals[1] == "grain": print("Incorrect header section, continuing anyways.") self._parse_header(line, "header") # parse the data lines if code == '0500': self.current_date = datetime.datetime.strptime(vals[1], '%d.%m.%Y %H:%M:%S') self.data[self.current_date] = Snowpack() return None n_elements = int(vals[1]) elements = vals[2:] elements_arr = np.array(elements).astype(float) self.data[self.current_date].set_param(code, elements_arr, n_elements) def _parse_sections(self, line:str, current_section:str): if current_section == "metadata" or current_section == "header": self._parse_header(line, current_section) elif current_section == "data": self._parse_data_section(line) else: raise ValueError("Invalid section: {}".format(current_section)) def _clear_mapping(self): """Clears the mapping of data codes""" for k, v in self.DataCodes.items(): if "(" in v and ")" in v: unit = v.split("(")[1].split(")")[0] else: unit = "b.E." self.units[k] = unit self.DataCodes[k] = v.split("(")[0].strip() self.DataCodes = {k : v.split("[")[0].strip() for k, v in self.DataCodes.items()} self.NamesToCodes = {v: k for k, v in self.DataCodes.items()} def _parse_profiles(self): """Parses the profiles""" for date in self.data: self.data[date]._parse_data(self._old_hardness) def update_name_of_code(self, code: str, name: str): """Updates the name of the code""" if code not in self.DataCodes: raise ValueError(f"{code} not found. Available codes are: {', '.join(self.DataCodes.keys())}") self.DataCodes[code] = name self.NamesToCodes[name] = code def name_to_code(self, name: str) -> str: """Returns the code for the given name""" try: return self.NamesToCodes[name] except KeyError: raise KeyError(f"{name} not found. Available names are: {', '.join(self.NamesToCodes.keys())}") def discard_below_ground(self, discard: bool = True): """Will only return the above ground data. Args: discard (bool): If True, below ground data will be discarded. If False, below ground data will be included. """ for date in self.data: self.data[date].discard_below_ground(discard) def get_profile_nr(self, nr: int) -> Snowpack: """Returns the profile for the given number""" return self.data[self.get_all_dates()[nr]] def get_profile_on(self, date: datetime.datetime) -> Snowpack: """ Returns the data for the given date. Args: date (datetime.datetime): The date for which to retrieve the data. Returns: pandas.DataFrame: The snowpack profile for the given date. """ if date in self.get_all_dates(): return self.data[date] else: return None def get_all_dates(self) -> List[datetime.datetime]: """Returns all the dates in the data. Returns: list: A list of all the dates in the data. """ return list(self.data.keys()) def get_all_profiles(self)-> List[Snowpack]: """ Returns all the profiles. Returns: list: returns a list of profiles for each date. """ return [self.get_profile_on(date) for date in self.get_all_dates()] def get_var(self, code:str, return_missing:bool = False ): """Returns the data for the given code""" return [self.get_profile_on(date).get_param(code, return_missing) for date in self.get_all_dates()] def toCSV(self, filename:str, integrate:bool=False): """Writes the data to a CSV file Args: filename (str): The name of the CSV file to write the data to. integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information. Special information is 0514: surface hoar and 0530: minimum stabiliy index """ # first write the metadata with open(filename, 'w') as file: file.write("# This file was created with the SnowpackReader library, which also provides a method to read this file.\n") file.write("# [METADATA]\n") for key, value in self.metadata.items(): file.write("# {} = {}\n".format(key, value)) file.write("# [DATA]\n") for date in self.get_all_dates(): file.write("# Date = {}\n".format(date.strftime('%d.%m.%Y %H:%M:%S'))) profile = self.get_profile_on(date) if profile.weak_layer is not None: file.write(f"# 0530, weak layer, {', '.join(map(str, profile.weak_layer))}\n") if profile.surface_hoar is not None: file.write(f"# 0514, surface hoar, {', '.join(map(str, profile.weak_layer))}\n") profile.toDf(integrate=integrate).to_csv(file, index=False, mode='a') def toHDF5(self, filename:str, integrate:bool = False): """Writes the data to a HDF5 file Args: filename (str): The name of the HDF5 file to write the data to. integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information. Special information is 0514: surface hoar and 0530: minimum stabiliy index """ with h5py.File(filename, 'w') as file: for key, value in self.metadata.items(): file.attrs[key] = value for date in self.get_all_dates(): group = file.create_group(date.strftime('%d.%m.%Y %H:%M:%S')) profile = self.get_profile_on(date) if profile.weak_layer is not None: group.attrs['weak_layer'] = profile.weak_layer if profile.surface_hoar is not None: group.attrs['surface'] = profile.surface_hoar df = profile.toDf(integrate=integrate) col_names = list(df.columns) arr = df.to_numpy() group.attrs["fields"] = col_names group.create_dataset("data", data=arr) def __str__(self): ss = "PRO File:\n" ss += "Filename: {}\n".format(self.filename) ss += "Metadata: {}\n".format(self.metadata) ss += "Data codes: {}\n".format(self.DataCodes) ss += "Units: {}\n".format(self.units) ss += "Dates: {}\n".format(self.get_all_dates()) return ss def info(self): print(self)Methods
def discard_below_ground(self, discard: bool = True)-
Will only return the above ground data.
Args
discard:bool- If True, below ground data will be discarded. If False, below ground data will be included.
Expand source code
def discard_below_ground(self, discard: bool = True): """Will only return the above ground data. Args: discard (bool): If True, below ground data will be discarded. If False, below ground data will be included. """ for date in self.data: self.data[date].discard_below_ground(discard) def get_all_dates(self) ‑> List[datetime.datetime]-
Returns all the dates in the data.
Returns
list- A list of all the dates in the data.
Expand source code
def get_all_dates(self) -> List[datetime.datetime]: """Returns all the dates in the data. Returns: list: A list of all the dates in the data. """ return list(self.data.keys()) def get_all_profiles(self) ‑> List[snowpat.snowpackreader.Snowpack.Snowpack]-
Returns all the profiles.
Returns
list- returns a list of profiles for each date.
Expand source code
def get_all_profiles(self)-> List[Snowpack]: """ Returns all the profiles. Returns: list: returns a list of profiles for each date. """ return [self.get_profile_on(date) for date in self.get_all_dates()] def get_profile_nr(self, nr: int) ‑> snowpat.snowpackreader.Snowpack.Snowpack-
Returns the profile for the given number
Expand source code
def get_profile_nr(self, nr: int) -> Snowpack: """Returns the profile for the given number""" return self.data[self.get_all_dates()[nr]] def get_profile_on(self, date: datetime.datetime) ‑> snowpat.snowpackreader.Snowpack.Snowpack-
Returns the data for the given date.
Args
date:datetime.datetime- The date for which to retrieve the data.
Returns
pandas.DataFrame- The snowpack profile for the given date.
Expand source code
def get_profile_on(self, date: datetime.datetime) -> Snowpack: """ Returns the data for the given date. Args: date (datetime.datetime): The date for which to retrieve the data. Returns: pandas.DataFrame: The snowpack profile for the given date. """ if date in self.get_all_dates(): return self.data[date] else: return None def get_var(self, code: str, return_missing: bool = False)-
Returns the data for the given code
Expand source code
def get_var(self, code:str, return_missing:bool = False ): """Returns the data for the given code""" return [self.get_profile_on(date).get_param(code, return_missing) for date in self.get_all_dates()] def info(self)-
Expand source code
def info(self): print(self) def name_to_code(self, name: str) ‑> str-
Returns the code for the given name
Expand source code
def name_to_code(self, name: str) -> str: """Returns the code for the given name""" try: return self.NamesToCodes[name] except KeyError: raise KeyError(f"{name} not found. Available names are: {', '.join(self.NamesToCodes.keys())}") def toCSV(self, filename: str, integrate: bool = False)-
Writes the data to a CSV file
Args
filename:str- The name of the CSV file to write the data to.
integrate:bool, optional- Whether to integrate the special information into the dataframe or keep it as additional information.
Special information is 0514: surface hoar and 0530: minimum stabiliy index
Expand source code
def toCSV(self, filename:str, integrate:bool=False): """Writes the data to a CSV file Args: filename (str): The name of the CSV file to write the data to. integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information. Special information is 0514: surface hoar and 0530: minimum stabiliy index """ # first write the metadata with open(filename, 'w') as file: file.write("# This file was created with the SnowpackReader library, which also provides a method to read this file.\n") file.write("# [METADATA]\n") for key, value in self.metadata.items(): file.write("# {} = {}\n".format(key, value)) file.write("# [DATA]\n") for date in self.get_all_dates(): file.write("# Date = {}\n".format(date.strftime('%d.%m.%Y %H:%M:%S'))) profile = self.get_profile_on(date) if profile.weak_layer is not None: file.write(f"# 0530, weak layer, {', '.join(map(str, profile.weak_layer))}\n") if profile.surface_hoar is not None: file.write(f"# 0514, surface hoar, {', '.join(map(str, profile.weak_layer))}\n") profile.toDf(integrate=integrate).to_csv(file, index=False, mode='a') def toHDF5(self, filename: str, integrate: bool = False)-
Writes the data to a HDF5 file
Args
filename:str- The name of the HDF5 file to write the data to.
integrate:bool, optional- Whether to integrate the special information into the dataframe or keep it as additional information.
Special information is 0514: surface hoar and 0530: minimum stabiliy index
Expand source code
def toHDF5(self, filename:str, integrate:bool = False): """Writes the data to a HDF5 file Args: filename (str): The name of the HDF5 file to write the data to. integrate (bool, optional): Whether to integrate the special information into the dataframe or keep it as additional information. Special information is 0514: surface hoar and 0530: minimum stabiliy index """ with h5py.File(filename, 'w') as file: for key, value in self.metadata.items(): file.attrs[key] = value for date in self.get_all_dates(): group = file.create_group(date.strftime('%d.%m.%Y %H:%M:%S')) profile = self.get_profile_on(date) if profile.weak_layer is not None: group.attrs['weak_layer'] = profile.weak_layer if profile.surface_hoar is not None: group.attrs['surface'] = profile.surface_hoar df = profile.toDf(integrate=integrate) col_names = list(df.columns) arr = df.to_numpy() group.attrs["fields"] = col_names group.create_dataset("data", data=arr) def update_name_of_code(self, code: str, name: str)-
Updates the name of the code
Expand source code
def update_name_of_code(self, code: str, name: str): """Updates the name of the code""" if code not in self.DataCodes: raise ValueError(f"{code} not found. Available codes are: {', '.join(self.DataCodes.keys())}") self.DataCodes[code] = name self.NamesToCodes[name] = code