Module snowpat.icsv
Expand source code
from .icsv_file import iCSVFile
from .application_profile import iCSVProfiles, append_timepoint
from .factory import read, from_smet
from .header import MetaDataSection, FieldsSection
__all__ = ["iCSVFile", "read", "from_smet", "MetaDataSection", "FieldsSection", "iCSVProfiles", "append_timepoint"]
Sub-modules
snowpat.icsv.application_profilesnowpat.icsv.factorysnowpat.icsv.headersnowpat.icsv.icsv_filesnowpat.icsv.utility
Functions
def append_timepoint(filename: str, timestamp: datetime.datetime, data: pandas.core.frame.DataFrame, field_delimiter: str = ',')-
Appends a new timepoint to the iCSV file.
Args
filename:str- The name of the file to append to.
timestamp:datetime.datetime- The timestamp of the new timepoint.
data:pd.DataFrame- The data to append.
Returns
None
Expand source code
def append_timepoint(filename: str, timestamp: datetime.datetime, data: pd.DataFrame, field_delimiter: str = ","): """ Appends a new timepoint to the iCSV file. Args: filename (str): The name of the file to append to. timestamp (datetime.datetime): The timestamp of the new timepoint. data (pd.DataFrame): The data to append. Returns: None """ with open(filename, 'a') as file: file.write(f"# [DATE={timestamp.isoformat()}]\n") data.to_csv(file, mode='a', index=False, header=False, sep=field_delimiter) def from_smet(smet: SMETFile) ‑> iCSVFile-
Converts an SMETFile object to an iCSVFile object.
Args
smet:SMETFile- The SMETFile object to convert.
Returns
iCSVFile- The converted iCSVFile object.
Expand source code
def from_smet(smet: SMETFile) -> iCSVFile: """ Converts an SMETFile object to an iCSVFile object. Args: smet (SMETFile): The SMETFile object to convert. Returns: iCSVFile: The converted iCSVFile object. """ icsv = iCSVFile() _set_fields_and_location(icsv, smet) _set_metadata(icsv, smet) icsv.data = smet.data _check_validity_and_parse_geometry(icsv, icsv.data.shape[1]) return icsv def read(filename: str) ‑> iCSVFile-
Reads an iCSV file and returns an iCSVFile object (or the respective application profile specific object).
Args
filename:str- The path to the iCSV file.
Returns
iCSVFile/ApplicationProfile: An iCSVFile or subclass object representing the contents of the file. The iCSVFile object has the following attributes: - metadata: The metadata section of the iCSV file. access attributes via metadata.get_attribute("key") - fields: The fields section of the iCSV file. access attributes via fields.get_attribute("key") - geometry: The geometry section of the iCSV file. get the location via geometry.get_location() - data: The data section of the iCSV file. As a pandas DataFrame. - filename: The name of the iCSV file. - skip_lines: The number of lines to skip when reading the file.
Expand source code
def read(filename: str) -> iCSVFile: """ Reads an iCSV file and returns an iCSVFile object (or the respective application profile specific object). Args: filename (str): The path to the iCSV file. Returns: iCSVFile/ApplicationProfile: An iCSVFile or subclass object representing the contents of the file. The iCSVFile object has the following attributes: - metadata: The metadata section of the iCSV file. access attributes via metadata.get_attribute("key") - fields: The fields section of the iCSV file. access attributes via fields.get_attribute("key") - geometry: The geometry section of the iCSV file. get the location via geometry.get_location() - data: The data section of the iCSV file. As a pandas DataFrame. - filename: The name of the iCSV file. - skip_lines: The number of lines to skip when reading the file. """ firstline = open(filename).readline().rstrip() if firstline in FIRSTLINES_SNOWPROFILE: return iCSVProfiles(filename) elif firstline in FIRSTLINES: return iCSVFile(filename) else: raise ValueError("Not an iCSV file")
Classes
class FieldsSection-
A class used to represent the fields section of an iCSV file.
Attributes
fields:list- List of fields.
recommended_fields:list- Fields that are recommended to be present in the fields section.
other_fields:list- Fields that are not recommended to be present in the fields section.
Methods
check_validity(n_cols: int): Performs a sanity check. str(): Returns a string representation of the fields. set_attribute(attribute_name: str, value: list): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. all_fields: Returns all fields. miscellaneous_fields: Returns all fields that are not required.
Expand source code
class FieldsSection: """ A class used to represent the fields section of an iCSV file. Attributes: fields (list): List of fields. recommended_fields (list): Fields that are recommended to be present in the fields section. other_fields (list): Fields that are not recommended to be present in the fields section. Methods: check_validity(n_cols: int): Performs a sanity check. __str__(): Returns a string representation of the fields. set_attribute(attribute_name: str, value: list): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. all_fields: Returns all fields. miscellaneous_fields: Returns all fields that are not required. """ def __init__(self): self.fields = [] self.recommended_fields = { "units_multiplier": [], "units": [], "long_name": [], "standard_name": [], } self.other_fields = {} def __str__(self): recommended_fields_string = "\n".join( f"{key} : {value}" for key, value in self.recommended_fields.items() if value ) other_fields_string = "\n".join( f"{key} : {value}" for key, value in self.other_fields.items() if value ) return f"Fields: {self.fields}\nRecommended Fields:\n{recommended_fields_string}\nOther Fields:\n{other_fields_string}" def __eq__(self, value: object) -> bool: if not isinstance(value, FieldsSection): return False for attr in ["fields", "recommended_fields", "other_fields"]: self_dict = getattr(self, attr) value_dict = getattr(value, attr) if self_dict != value_dict: return False return True def check_validity(self, n_cols: int): if not self.fields: raise ValueError("No fields provided") if len(self.fields) != n_cols: raise ValueError("Number of fields does not match the number of columns") for key, val in self.recommended_fields.items(): if val and len(self.recommended_fields[key]) != n_cols: raise ValueError( f"Number of {key} does not match the number of columns" ) for key, val in self.other_fields.items(): if val and len(self.other_fields[key]) != n_cols: raise ValueError( f"Number of {key} does not match the number of columns" ) def set_attribute(self, attribute_name, value: list): value = [float(val) if is_number(val) else val for val in value] if attribute_name == "fields": self.fields = value elif attribute_name in self.recommended_fields: self.recommended_fields[attribute_name] = value else: self.other_fields[attribute_name] = value def get_attribute(self, attribute_name): if attribute_name == "fields": return self.fields elif attribute_name in self.recommended_fields: return self.recommended_fields[attribute_name] else: if attribute_name in self.other_fields: return self.other_fields[attribute_name] return None @property def all_fields(self): return { "fields": self.fields, **{k: v for k, v in self.recommended_fields.items() if v}, **{k: v for k, v in self.other_fields.items() if v}, } @property def miscalleneous_fields(self): return { **{k: v for k, v in self.recommended_fields.items() if v}, **{k: v for k, v in self.other_fields.items() if v}, }Instance variables
var all_fields-
Expand source code
@property def all_fields(self): return { "fields": self.fields, **{k: v for k, v in self.recommended_fields.items() if v}, **{k: v for k, v in self.other_fields.items() if v}, } var miscalleneous_fields-
Expand source code
@property def miscalleneous_fields(self): return { **{k: v for k, v in self.recommended_fields.items() if v}, **{k: v for k, v in self.other_fields.items() if v}, }
Methods
def check_validity(self, n_cols: int)-
Expand source code
def check_validity(self, n_cols: int): if not self.fields: raise ValueError("No fields provided") if len(self.fields) != n_cols: raise ValueError("Number of fields does not match the number of columns") for key, val in self.recommended_fields.items(): if val and len(self.recommended_fields[key]) != n_cols: raise ValueError( f"Number of {key} does not match the number of columns" ) for key, val in self.other_fields.items(): if val and len(self.other_fields[key]) != n_cols: raise ValueError( f"Number of {key} does not match the number of columns" ) def get_attribute(self, attribute_name)-
Expand source code
def get_attribute(self, attribute_name): if attribute_name == "fields": return self.fields elif attribute_name in self.recommended_fields: return self.recommended_fields[attribute_name] else: if attribute_name in self.other_fields: return self.other_fields[attribute_name] return None def set_attribute(self, attribute_name, value: list)-
Expand source code
def set_attribute(self, attribute_name, value: list): value = [float(val) if is_number(val) else val for val in value] if attribute_name == "fields": self.fields = value elif attribute_name in self.recommended_fields: self.recommended_fields[attribute_name] = value else: self.other_fields[attribute_name] = value
class MetaDataSection-
A class used to represent the metadata section of an iCSV file.
Attributes
required_attributes:dict- Attributes that are required to be present in the metadata.
recommended_attributes:dict- Attributes that are recommended to be present in the metadata.
acdd_metadata:dict- Metadata that is part of the ACDD standard.
other_metadata:dict- Metadata that is not part of the ACDD standard.
Methods
check_validity(): Performs a sanity check. str(): Returns a string representation of the metadata. set_attribute(attribute_name: str, value: any): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. metadata: Returns all metadata. join(other: MetaDataSection): Joins two metadata sections.
Expand source code
class MetaDataSection: """ A class used to represent the metadata section of an iCSV file. Attributes: required_attributes (dict): Attributes that are required to be present in the metadata. recommended_attributes (dict): Attributes that are recommended to be present in the metadata. acdd_metadata (dict): Metadata that is part of the ACDD standard. other_metadata (dict): Metadata that is not part of the ACDD standard. Methods: check_validity(): Performs a sanity check. __str__(): Returns a string representation of the metadata. set_attribute(attribute_name: str, value: any): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. metadata: Returns all metadata. join(other: MetaDataSection): Joins two metadata sections. """ def __init__(self): self.required_attributes = { "field_delimiter": None, "geometry": None, "srid": None, } self.recommended_attributes = { "station_id": None, "nodata": None, "timezone": None, "doi": None, "timestamp_meaning": None, } self.acdd_metadata = ACDDMetadata() self.other_metadata = {} def __str__(self): required_attribute_string = "\n".join( f"{key} : {value}" for key, value in self.required_attributes.items() if value is not None ) recommended_attribute_string = "\n".join( f"{key} : {value}" for key, value in self.recommended_attributes.items() if value is not None ) other_metadata_string = "\n".join( f"{key} : {value}" for key, value in self.other_metadata.items() if value is not None ) return f"METADATA:\nRequired:\n{required_attribute_string}\nRecommended:\n{recommended_attribute_string}\n{self.acdd_metadata}\nOther Metadata:\n{other_metadata_string}" def __eq__(self, value: object) -> bool: if not isinstance(value, MetaDataSection): return False for attr in ["required_attributes", "recommended_attributes", "other_metadata"]: self_dict = getattr(self, attr) value_dict = getattr(value, attr) common_keys = self_dict.keys() & value_dict.keys() for key in common_keys: if self_dict[key] is not None and value_dict[key] is not None: if self_dict[key] != value_dict[key]: return False return self.acdd_metadata == value.acdd_metadata def init_application(self, application_profile): pass def check_validity(self): for key, value in self.required_attributes.items(): if value is None: raise ValueError(f"Required attribute {key} is missing") def set_attribute(self, attribute_name, value): if is_number(value): value = float(value) if attribute_name in self.required_attributes: self.required_attributes[attribute_name] = value elif attribute_name in self.recommended_attributes: self.recommended_attributes[attribute_name] = value if not self.acdd_metadata.set_attribute(attribute_name, value): self.other_metadata[attribute_name] = value def get_attribute(self, attribute_name): if attribute_name in self.required_attributes: return self.required_attributes[attribute_name] elif attribute_name in self.recommended_attributes: return self.recommended_attributes[attribute_name] else: if self.acdd_metadata.get_attribute(attribute_name): return self.acdd_metadata.get_attribute(attribute_name) if attribute_name in self.other_metadata: return self.other_metadata[attribute_name] return None def join(self, other: "MetaDataSection"): for attr_dict in [ other.required_attributes, other.recommended_attributes, other.other_metadata, ]: for attribute, value in attr_dict.items(): self_value = self.get_attribute(attribute) if value and not self_value: self.set_attribute(attribute, value) elif value and self_value != value: print( f"Attribute {attribute} is different in both MetaDataSection objects" ) self.acdd_metadata.join(other.acdd_metadata) @property def metadata(self) -> dict: return { **self.required_attributes, **{k: v for k, v in self.recommended_attributes.items() if v}, **{k: v for k, v in self.other_metadata.items() if v}, **self.acdd_metadata.adjusted_dict, }Instance variables
var metadata : dict-
Expand source code
@property def metadata(self) -> dict: return { **self.required_attributes, **{k: v for k, v in self.recommended_attributes.items() if v}, **{k: v for k, v in self.other_metadata.items() if v}, **self.acdd_metadata.adjusted_dict, }
Methods
def check_validity(self)-
Expand source code
def check_validity(self): for key, value in self.required_attributes.items(): if value is None: raise ValueError(f"Required attribute {key} is missing") def get_attribute(self, attribute_name)-
Expand source code
def get_attribute(self, attribute_name): if attribute_name in self.required_attributes: return self.required_attributes[attribute_name] elif attribute_name in self.recommended_attributes: return self.recommended_attributes[attribute_name] else: if self.acdd_metadata.get_attribute(attribute_name): return self.acdd_metadata.get_attribute(attribute_name) if attribute_name in self.other_metadata: return self.other_metadata[attribute_name] return None def init_application(self, application_profile)-
Expand source code
def init_application(self, application_profile): pass def join(self, other: MetaDataSection)-
Expand source code
def join(self, other: "MetaDataSection"): for attr_dict in [ other.required_attributes, other.recommended_attributes, other.other_metadata, ]: for attribute, value in attr_dict.items(): self_value = self.get_attribute(attribute) if value and not self_value: self.set_attribute(attribute, value) elif value and self_value != value: print( f"Attribute {attribute} is different in both MetaDataSection objects" ) self.acdd_metadata.join(other.acdd_metadata) def set_attribute(self, attribute_name, value)-
Expand source code
def set_attribute(self, attribute_name, value): if is_number(value): value = float(value) if attribute_name in self.required_attributes: self.required_attributes[attribute_name] = value elif attribute_name in self.recommended_attributes: self.recommended_attributes[attribute_name] = value if not self.acdd_metadata.set_attribute(attribute_name, value): self.other_metadata[attribute_name] = value
class iCSVFile (filename: str = None)-
Class to represent an iCSV file.
Attributes
metadata:MetadataSection- Metadata section of the iCSV file.
fields:FieldsSection- Fields section of the iCSV file.
geometry:Representation class- Geometry section of the iCSV file.
data:pd.Dataframe- Data section of the iCSV file.
filename- The name of the iCSV file.
skip_lines- The number of lines to skip when reading the file.
Methods
load_file(filename: str = None): Load an iCSV file. parse_geometry(): Parse the geometry section of the iCSV file. info(): Print a summary of the iCSV file. to_xarray(): Convert the iCSV file to an xarray dataset. setData(data: pd.DataFrame, colnames: Optional[list] = None): Set the data of the iCSV file. write(filename: str = None): Write the iCSV file to a file.
Expand source code
class iCSVFile: """ Class to represent an iCSV file. Attributes: metadata (MetadataSection): Metadata section of the iCSV file. fields (FieldsSection): Fields section of the iCSV file. geometry (Representation class): Geometry section of the iCSV file. data (pd.Dataframe): Data section of the iCSV file. filename: The name of the iCSV file. skip_lines: The number of lines to skip when reading the file. Methods: load_file(filename: str = None): Load an iCSV file. parse_geometry(): Parse the geometry section of the iCSV file. info(): Print a summary of the iCSV file. to_xarray(): Convert the iCSV file to an xarray dataset. setData(data: pd.DataFrame, colnames: Optional[list] = None): Set the data of the iCSV file. write(filename: str = None): Write the iCSV file to a file. """ def __init__(self, filename:str = None): self.metadata = MetaDataSection() self.fields = FieldsSection() self.geometry = Geometry() self.data = None self.filename = filename self.skip_lines = 0 if self.filename: self.load_file() def __str__(self) -> str: return f"File: {self.filename}\n{self.metadata}\n{self.fields}\n{self.geometry}" def __eq__(self, value: object) -> bool: try: for attr in ['metadata', 'fields', 'geometry']: self_value = getattr(self, attr) value_value = getattr(value, attr) if self_value != value_value: return False return True except AttributeError: return False def _parse_comment_line(self, line, section): if line == "[METADATA]": return "metadata" elif line == "[FIELDS]": self.metadata.check_validity() # to parse fields we need valid metadata return "fields" elif line == "[DATA]": return "data" else: return self._parse_section_line(line, section) def _parse_section_line(self, line, section): if not section: raise ValueError("No section specified") line_vals = line.split("=") if len(line_vals) != 2: raise ValueError(f"Invalid {section} line: {line}, got 2 assignment operators \"=\"") if section == "metadata": self.metadata.set_attribute(line_vals[0].strip(), line_vals[1].strip()) elif section == "fields": fields_vec = [field.strip() for field in line_vals[1].split(self.metadata.get_attribute("field_delimiter"))] self.fields.set_attribute(line_vals[0].strip(), fields_vec) elif section == "data": raise TypeError("Data section should not contain any comments") return section def _update_columns(self): self.data.columns = self.fields.fields for field in ["time", "timestamp"]: if field in self.fields.fields: self.data[field] = pd.to_datetime(self.data[field]) def load_file(self, filename: str = None): """Loads an iCSV file and parses its contents. Args: filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used. Raises: ValueError: If the file is not a valid iCSV file or if the data section is not specified. Returns: None """ if filename: self.filename = filename section = "" with open(self.filename, 'r') as file: first_line = file.readline().rstrip() # rstrip() is used to remove the trailing newline if first_line not in FIRSTLINES: raise ValueError("Not an iCSV file") line_number = 1 # need to find the line number where the data starts for line in file: if line.startswith("#"): line_number += 1 line = line[1:].strip() section = self._parse_comment_line(line.strip(), section) else: if section != "data": raise ValueError("Data section was not specified") self.skip_lines = line_number break self.data = pd.read_csv(self.filename, skiprows=self.skip_lines, header=None, sep=self.metadata.get_attribute("field_delimiter")) self.fields.check_validity(self.data.shape[1]) # check if the number of fields match the number of columns self._update_columns() self.parse_geometry() def parse_geometry(self): if self.metadata.get_attribute("geometry") in self.fields.get_attribute("fields"): self.geometry.geometry = self.metadata.get_attribute("geometry") self.geometry.srid = self.metadata.get_attribute("srid") self.geometry.column_name = self.metadata.get_attribute("column_name") else: self.geometry.geometry = self.metadata.get_attribute("geometry") self.geometry.srid = self.metadata.get_attribute("srid") self.geometry.set_location() def info(self): """ Prints information about the object and its data. This method prints the object itself and the head of its data. Args: None Returns: None """ print(self) print("\nData:") print(self.data.head()) def to_xarray(self) -> xr.Dataset: """ Converts the data to an xarray dataset. Returns: xarray.Dataset: The converted xarray dataset. """ arr = self.data.to_xarray() arr.attrs = self.metadata.metadata for i,var in enumerate(arr.data_vars): for _, vec in self.fields.miscalleneous_fields.items(): arr[var].attrs = vec[i] def setData(self, data: pd.DataFrame, colnames: Optional[list] = None): """ Sets the data of the iCSV file. Args: data (pd.DataFrame): The data to set. colnames (list): The names of the columns in the data. Returns: None """ self.data = data if colnames: if len(colnames) != self.data.shape[1]: raise ValueError("Number of columns in data does not match the number of column names") self.fields.set_attribute("fields", colnames) else: colnames = self.data.columns.to_list() if colnames[0] == "0" or colnames[0] == 0: raise ValueError("Column names are not provided") self.fields.set_attribute("fields", colnames) # Ensure 'timestamp' is the first column if it exists if 'timestamp' in self.data.columns: cols = self.data.columns.tolist() if cols[0] != 'timestamp': cols.insert(0, cols.pop(cols.index('timestamp'))) self.data = self.data[cols] self.fields.set_attribute("fields", self.data.columns) def write(self, filename: str = None): """ Writes the metadata, fields, and data to a CSV file. Args: filename (str, optional): The name of the file to write. If not provided, the current filename will be used. Returns: None """ if filename: self.filename = filename self.metadata.check_validity() self.fields.check_validity(self.data.shape[1]) with open(self.filename, 'w') as file: file.write(f"{FIRSTLINES[-1]}\n") file.write("# [METADATA]\n") for key, val in self.metadata.metadata.items(): file.write(f"# {key} = {val}\n") file.write("# [FIELDS]\n") for key, val in self.fields.all_fields.items(): fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val) file.write(f"# {key} = {fields_string}\n") file.write("# [DATA]\n") self.data.to_csv(self.filename, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter"))Subclasses
Methods
def info(self)-
Prints information about the object and its data.
This method prints the object itself and the head of its data.
Args
None
Returns
None
Expand source code
def info(self): """ Prints information about the object and its data. This method prints the object itself and the head of its data. Args: None Returns: None """ print(self) print("\nData:") print(self.data.head()) def load_file(self, filename: str = None)-
Loads an iCSV file and parses its contents.
Args
filename:str, optional- The path to the iCSV file. If not provided, the previously set filename will be used.
Raises
ValueError- If the file is not a valid iCSV file or if the data section is not specified.
Returns
None
Expand source code
def load_file(self, filename: str = None): """Loads an iCSV file and parses its contents. Args: filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used. Raises: ValueError: If the file is not a valid iCSV file or if the data section is not specified. Returns: None """ if filename: self.filename = filename section = "" with open(self.filename, 'r') as file: first_line = file.readline().rstrip() # rstrip() is used to remove the trailing newline if first_line not in FIRSTLINES: raise ValueError("Not an iCSV file") line_number = 1 # need to find the line number where the data starts for line in file: if line.startswith("#"): line_number += 1 line = line[1:].strip() section = self._parse_comment_line(line.strip(), section) else: if section != "data": raise ValueError("Data section was not specified") self.skip_lines = line_number break self.data = pd.read_csv(self.filename, skiprows=self.skip_lines, header=None, sep=self.metadata.get_attribute("field_delimiter")) self.fields.check_validity(self.data.shape[1]) # check if the number of fields match the number of columns self._update_columns() self.parse_geometry() def parse_geometry(self)-
Expand source code
def parse_geometry(self): if self.metadata.get_attribute("geometry") in self.fields.get_attribute("fields"): self.geometry.geometry = self.metadata.get_attribute("geometry") self.geometry.srid = self.metadata.get_attribute("srid") self.geometry.column_name = self.metadata.get_attribute("column_name") else: self.geometry.geometry = self.metadata.get_attribute("geometry") self.geometry.srid = self.metadata.get_attribute("srid") self.geometry.set_location() def setData(self, data: pandas.core.frame.DataFrame, colnames: Optional[list] = None)-
Sets the data of the iCSV file.
Args
data:pd.DataFrame- The data to set.
colnames:list- The names of the columns in the data.
Returns
None
Expand source code
def setData(self, data: pd.DataFrame, colnames: Optional[list] = None): """ Sets the data of the iCSV file. Args: data (pd.DataFrame): The data to set. colnames (list): The names of the columns in the data. Returns: None """ self.data = data if colnames: if len(colnames) != self.data.shape[1]: raise ValueError("Number of columns in data does not match the number of column names") self.fields.set_attribute("fields", colnames) else: colnames = self.data.columns.to_list() if colnames[0] == "0" or colnames[0] == 0: raise ValueError("Column names are not provided") self.fields.set_attribute("fields", colnames) # Ensure 'timestamp' is the first column if it exists if 'timestamp' in self.data.columns: cols = self.data.columns.tolist() if cols[0] != 'timestamp': cols.insert(0, cols.pop(cols.index('timestamp'))) self.data = self.data[cols] self.fields.set_attribute("fields", self.data.columns) def to_xarray(self) ‑> xarray.core.dataset.Dataset-
Converts the data to an xarray dataset.
Returns
xarray.Dataset- The converted xarray dataset.
Expand source code
def to_xarray(self) -> xr.Dataset: """ Converts the data to an xarray dataset. Returns: xarray.Dataset: The converted xarray dataset. """ arr = self.data.to_xarray() arr.attrs = self.metadata.metadata for i,var in enumerate(arr.data_vars): for _, vec in self.fields.miscalleneous_fields.items(): arr[var].attrs = vec[i] def write(self, filename: str = None)-
Writes the metadata, fields, and data to a CSV file.
Args
filename:str, optional- The name of the file to write. If not provided, the current filename will be used.
Returns
None
Expand source code
def write(self, filename: str = None): """ Writes the metadata, fields, and data to a CSV file. Args: filename (str, optional): The name of the file to write. If not provided, the current filename will be used. Returns: None """ if filename: self.filename = filename self.metadata.check_validity() self.fields.check_validity(self.data.shape[1]) with open(self.filename, 'w') as file: file.write(f"{FIRSTLINES[-1]}\n") file.write("# [METADATA]\n") for key, val in self.metadata.metadata.items(): file.write(f"# {key} = {val}\n") file.write("# [FIELDS]\n") for key, val in self.fields.all_fields.items(): fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val) file.write(f"# {key} = {fields_string}\n") file.write("# [DATA]\n") self.data.to_csv(self.filename, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter"))
class iCSVProfiles (filename: str = None)-
Class to represent an iCSV file containing snow profile data.
The iCSVProfiles extends the iCSVFile class to handle the specific structure and requirements of snow profile data, which includes multiple timestamped profile measurements and point measurements.
Attributes (additional to iCSVFile): dates (list): List of datetime objects representing measurement dates in the file. date_lines (list): List of line numbers where date entries begin in the file. data (dict): Dictionary mapping datetime objects to pandas DataFrames containing the profile data for that timestamp.
Key Features: - Handles multiple time-stamped profiles in a single file - Separates point measurements from profile measurements - Provides methods to extract and filter point and profile values - Can convert to xarray Dataset for multi-dimensional data analysis
The snowprofile format follows the iCSV specification with the addition of [DATE=timestamp] markers in the data section to separate measurements from different dates. Each profile must include a 'layer_number' field to identify profile layers versus point measurements.
Expand source code
class iCSVProfiles(iCSVFile): """ Class to represent an iCSV file containing snow profile data. The iCSVProfiles extends the iCSVFile class to handle the specific structure and requirements of snow profile data, which includes multiple timestamped profile measurements and point measurements. Attributes (additional to iCSVFile): dates (list): List of datetime objects representing measurement dates in the file. date_lines (list): List of line numbers where date entries begin in the file. data (dict): Dictionary mapping datetime objects to pandas DataFrames containing the profile data for that timestamp. Key Features: - Handles multiple time-stamped profiles in a single file - Separates point measurements from profile measurements - Provides methods to extract and filter point and profile values - Can convert to xarray Dataset for multi-dimensional data analysis The snowprofile format follows the iCSV specification with the addition of [DATE=timestamp] markers in the data section to separate measurements from different dates. Each profile must include a 'layer_number' field to identify profile layers versus point measurements. """ def __init__(self, filename: str = None): self.dates = [] self.date_lines = [] super().__init__(filename) def _parse_comment_line(self, line, section, line_number): if line == "[METADATA]": return "metadata" elif line == "[FIELDS]": self.metadata.check_validity() # to parse fields we need valid metadata return "fields" elif line == "[DATA]": return "data" else: return self._parse_section_line(line, section, line_number) def _parse_section_line(self, line, section, line_number): if not section: raise ValueError("No section specified") line_vals = line.split("=") if len(line_vals) != 2: raise ValueError(f"Invalid {section} line: {line}, got 2 assignment operators \"=\"") if section == "metadata": self.metadata.set_attribute(line_vals[0].strip(), line_vals[1].strip()) elif section == "fields": fields_vec = [field.strip() for field in line_vals[1].split(self.metadata.get_attribute("field_delimiter"))] self.fields.set_attribute(line_vals[0].strip(), fields_vec) elif section == "data": if "[DATE=" in line: date_str = line.split('[DATE=')[1].split(']')[0].strip() self.dates.append(datetime.datetime.fromisoformat(date_str)) self.date_lines.append(line_number) else: raise ValueError(f"Invalid data line: {line}") return section def load_file(self, filename: str = None): """Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile. Args: filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used. Raises: ValueError: If the file is not a valid iCSV file or if the data section is not specified. Returns: None """ self.data = dict() if filename: self.filename = filename section = "" with open(self.filename, 'r') as file: first_line = file.readline().rstrip() # rstrip() is used to remove the trailing newline if first_line not in FIRSTLINES_SNOWPROFILE: raise ValueError("Not an iCSV file with the snowprofile application profile") line_number = 1 # need to find the line number where the data starts for line in file: line_number += 1 if line.startswith("#"): line = line[1:].strip() section = self._parse_comment_line(line.strip(), section, line_number) else: if section != "data": raise ValueError("Data section was not specified") for (i, date) in enumerate(self.dates): first_data_line = self.date_lines[i] last_data_line = self.date_lines[i+1] if i+1 < len(self.dates) else line_number + 1 self.data[date] = pd.read_csv(self.filename, skiprows=first_data_line, nrows=last_data_line-first_data_line-1, header=None, sep=self.metadata.get_attribute("field_delimiter")) self.data[date].columns = self.fields.fields self.fields.check_validity(self.data[self.dates[0]].shape[1]) # check if the number of fields match the number of columns self.parse_geometry() def info(self): """ Prints information about the object and its data. This method prints the object itself and the head of its data. Args: None Returns: None """ print(self) print("\nDates:") print(self.dates) print("\nFirst Profile:") print(self.data[self.dates[0]].head()) def to_xarray(self): """ Converts the data to a single 3D xarray Dataset with 'time' as one dimension. Returns: xarray.Dataset: The combined xarray dataset. """ # Convert each DataFrame to xarray DataArray arrays = [] for date in self.dates: df = self.data[date].copy() df.set_index("layer_number", inplace=True) arrays.append(df.to_xarray()) # Concatenate along new time dimension ds = xr.concat(arrays, dim="time") ds = ds.assign_coords(time=self.dates) # Optionally add metadata ds.attrs = self.metadata.metadata return ds def setData(self, timestamp: datetime.datetime, data: pd.DataFrame, colnames: Optional[list] = None): if not self.data: self.data = dict() self.dates.append(timestamp) self.data[timestamp] = data def write(self, filename: str = None): """ Writes the metadata, fields, and data to a CSV file. Args: filename (str, optional): The name of the file to write. If not provided, the current filename will be used. Returns: None """ if filename: self.filename = filename self.metadata.check_validity() if "model" not in self.metadata.metadata: raise ValueError("model is a required metadata for the Snowprofile application profile") first_key = self.dates[0] self.fields.check_validity(self.data[first_key].shape[1]) if "layer_number" not in self.fields.fields: raise ValueError("layer_number is a required field for the Snowprofile application profile") with open(self.filename, 'w') as file: file.write(f"{FIRSTLINES_SNOWPROFILE[-1]}\n") file.write("# [METADATA]\n") for key, val in self.metadata.metadata.items(): file.write(f"# {key} = {val}\n") file.write("# [FIELDS]\n") for key, val in self.fields.all_fields.items(): fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val) file.write(f"# {key} = {fields_string}\n") file.write("# [DATA]\n") for date in self.dates: file.write(f"# [DATE={date.isoformat()}]\n") self.data[date].to_csv(file, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter") ) def get_point_values(self): """ Extracts point measurements from the snow profile data. Retrieves all data rows where 'layer_number' equals 'point' across all dates, combining them into a single DataFrame with timestamps. Automatically filters out columns that contain only nodata values. Returns: pandas.DataFrame: DataFrame containing all point measurements with timestamps, with columns filtered to include only those with valid data. Returns an empty DataFrame if no point measurements are found. """ nodata = self.metadata.get_attribute("nodata") point_rows = [] for date in self.dates: data_at_date = self.data[date] if "point" not in data_at_date["layer_number"].values: continue point_df = data_at_date[data_at_date["layer_number"] == "point"].copy() point_df.insert(0, "timestamp", date) point_rows.append(point_df) if not point_rows: return pd.DataFrame(), [] # Empty DataFrame and empty list if no points found all_points = pd.concat(point_rows, ignore_index=True) # Exclude the 'timestamp' and 'layer_number' columns from nodata filtering cols_to_check = [col for col in all_points.columns if col not in ["timestamp", "layer_number"]] # Find columns where not all values are -999 valid_cols = [col for col in cols_to_check if not (all_points[col] == nodata).all()] # Always keep 'timestamp' and 'layer_number' final_cols = ["timestamp"] + valid_cols filtered_points = all_points[final_cols] return filtered_points def get_profile_values(self, as_xarray=False): """ Extracts profile measurements from the snow profile data. Retrieves all data rows where 'layer_number' is not 'point' across all dates, organizing them by timestamp. Automatically filters out columns containing only nodata values for each profile. Args: as_xarray (bool, optional): If True, returns data as an xarray Dataset with dimensions (time, layer) instead of a dictionary. Default is False. Returns: dict or xarray.Dataset: If as_xarray=False (default), returns a dictionary mapping datetime objects to pandas DataFrames with layer_number as index. If as_xarray=True, returns a multi-dimensional xarray Dataset with dimensions of time and layer, and variables for each measurement type. """ NODATA = self.metadata.get_attribute("nodata") profiles_dict = {} for date in self.dates: data_at_date = self.data[date] profile_df = data_at_date[data_at_date["layer_number"] != "point"].copy() colnames = profile_df.columns # Remove columns where all values are -999 (except 'layer_number') valid_cols = [col for col in colnames if not (profile_df[col] == NODATA).all()] filtered_df = profile_df[valid_cols] profiles_dict[date] = filtered_df.set_index("layer_number") if as_xarray and profiles_dict: # Convert to xarray Dataset: dims=time, layer, variables=fields # Find union of all columns all_vars = set() for df in profiles_dict.values(): all_vars.update(df.columns) all_vars = sorted(all_vars) # Build a 3D array: (time, layer, variable) times = list(profiles_dict.keys()) max_layers = max(len(df) for df in profiles_dict.values()) data = {var: [] for var in all_vars} layer_numbers = [] for date in times: df = profiles_dict[date] layer_numbers.append(df.index.tolist() + [None]*(max_layers - len(df))) for var in all_vars: col_data = df[var].tolist() if var in df.columns else [NODATA]*len(df) col_data += [NODATA]*(max_layers - len(col_data)) data[var].append(col_data) ds = xr.Dataset( {var: (['time', 'layer'], data[var]) for var in all_vars}, coords={ 'time': times, 'layer': range(max_layers), 'layer_number': (['time', 'layer'], layer_numbers) } ) return ds return profiles_dictAncestors
Methods
def get_point_values(self)-
Extracts point measurements from the snow profile data.
Retrieves all data rows where 'layer_number' equals 'point' across all dates, combining them into a single DataFrame with timestamps. Automatically filters out columns that contain only nodata values.
Returns
pandas.DataFrame- DataFrame containing all point measurements with timestamps, with columns filtered to include only those with valid data. Returns an empty DataFrame if no point measurements are found.
Expand source code
def get_point_values(self): """ Extracts point measurements from the snow profile data. Retrieves all data rows where 'layer_number' equals 'point' across all dates, combining them into a single DataFrame with timestamps. Automatically filters out columns that contain only nodata values. Returns: pandas.DataFrame: DataFrame containing all point measurements with timestamps, with columns filtered to include only those with valid data. Returns an empty DataFrame if no point measurements are found. """ nodata = self.metadata.get_attribute("nodata") point_rows = [] for date in self.dates: data_at_date = self.data[date] if "point" not in data_at_date["layer_number"].values: continue point_df = data_at_date[data_at_date["layer_number"] == "point"].copy() point_df.insert(0, "timestamp", date) point_rows.append(point_df) if not point_rows: return pd.DataFrame(), [] # Empty DataFrame and empty list if no points found all_points = pd.concat(point_rows, ignore_index=True) # Exclude the 'timestamp' and 'layer_number' columns from nodata filtering cols_to_check = [col for col in all_points.columns if col not in ["timestamp", "layer_number"]] # Find columns where not all values are -999 valid_cols = [col for col in cols_to_check if not (all_points[col] == nodata).all()] # Always keep 'timestamp' and 'layer_number' final_cols = ["timestamp"] + valid_cols filtered_points = all_points[final_cols] return filtered_points def get_profile_values(self, as_xarray=False)-
Extracts profile measurements from the snow profile data.
Retrieves all data rows where 'layer_number' is not 'point' across all dates, organizing them by timestamp. Automatically filters out columns containing only nodata values for each profile.
Args
as_xarray:bool, optional- If True, returns data as an xarray Dataset with dimensions (time, layer) instead of a dictionary. Default is False.
Returns
dictorxarray.Dataset- If as_xarray=False (default), returns a dictionary mapping datetime objects to pandas DataFrames with layer_number as index. If as_xarray=True, returns a multi-dimensional xarray Dataset with dimensions of time and layer, and variables for each measurement type.
Expand source code
def get_profile_values(self, as_xarray=False): """ Extracts profile measurements from the snow profile data. Retrieves all data rows where 'layer_number' is not 'point' across all dates, organizing them by timestamp. Automatically filters out columns containing only nodata values for each profile. Args: as_xarray (bool, optional): If True, returns data as an xarray Dataset with dimensions (time, layer) instead of a dictionary. Default is False. Returns: dict or xarray.Dataset: If as_xarray=False (default), returns a dictionary mapping datetime objects to pandas DataFrames with layer_number as index. If as_xarray=True, returns a multi-dimensional xarray Dataset with dimensions of time and layer, and variables for each measurement type. """ NODATA = self.metadata.get_attribute("nodata") profiles_dict = {} for date in self.dates: data_at_date = self.data[date] profile_df = data_at_date[data_at_date["layer_number"] != "point"].copy() colnames = profile_df.columns # Remove columns where all values are -999 (except 'layer_number') valid_cols = [col for col in colnames if not (profile_df[col] == NODATA).all()] filtered_df = profile_df[valid_cols] profiles_dict[date] = filtered_df.set_index("layer_number") if as_xarray and profiles_dict: # Convert to xarray Dataset: dims=time, layer, variables=fields # Find union of all columns all_vars = set() for df in profiles_dict.values(): all_vars.update(df.columns) all_vars = sorted(all_vars) # Build a 3D array: (time, layer, variable) times = list(profiles_dict.keys()) max_layers = max(len(df) for df in profiles_dict.values()) data = {var: [] for var in all_vars} layer_numbers = [] for date in times: df = profiles_dict[date] layer_numbers.append(df.index.tolist() + [None]*(max_layers - len(df))) for var in all_vars: col_data = df[var].tolist() if var in df.columns else [NODATA]*len(df) col_data += [NODATA]*(max_layers - len(col_data)) data[var].append(col_data) ds = xr.Dataset( {var: (['time', 'layer'], data[var]) for var in all_vars}, coords={ 'time': times, 'layer': range(max_layers), 'layer_number': (['time', 'layer'], layer_numbers) } ) return ds return profiles_dict def load_file(self, filename: str = None)-
Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile.
Args
filename:str, optional- The path to the iCSV file. If not provided, the previously set filename will be used.
Raises
ValueError- If the file is not a valid iCSV file or if the data section is not specified.
Returns
None
Expand source code
def load_file(self, filename: str = None): """Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile. Args: filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used. Raises: ValueError: If the file is not a valid iCSV file or if the data section is not specified. Returns: None """ self.data = dict() if filename: self.filename = filename section = "" with open(self.filename, 'r') as file: first_line = file.readline().rstrip() # rstrip() is used to remove the trailing newline if first_line not in FIRSTLINES_SNOWPROFILE: raise ValueError("Not an iCSV file with the snowprofile application profile") line_number = 1 # need to find the line number where the data starts for line in file: line_number += 1 if line.startswith("#"): line = line[1:].strip() section = self._parse_comment_line(line.strip(), section, line_number) else: if section != "data": raise ValueError("Data section was not specified") for (i, date) in enumerate(self.dates): first_data_line = self.date_lines[i] last_data_line = self.date_lines[i+1] if i+1 < len(self.dates) else line_number + 1 self.data[date] = pd.read_csv(self.filename, skiprows=first_data_line, nrows=last_data_line-first_data_line-1, header=None, sep=self.metadata.get_attribute("field_delimiter")) self.data[date].columns = self.fields.fields self.fields.check_validity(self.data[self.dates[0]].shape[1]) # check if the number of fields match the number of columns self.parse_geometry() def to_xarray(self)-
Converts the data to a single 3D xarray Dataset with 'time' as one dimension.
Returns
xarray.Dataset- The combined xarray dataset.
Expand source code
def to_xarray(self): """ Converts the data to a single 3D xarray Dataset with 'time' as one dimension. Returns: xarray.Dataset: The combined xarray dataset. """ # Convert each DataFrame to xarray DataArray arrays = [] for date in self.dates: df = self.data[date].copy() df.set_index("layer_number", inplace=True) arrays.append(df.to_xarray()) # Concatenate along new time dimension ds = xr.concat(arrays, dim="time") ds = ds.assign_coords(time=self.dates) # Optionally add metadata ds.attrs = self.metadata.metadata return ds
Inherited members