Module snowpat.icsv

Expand source code
from .icsv_file import iCSVFile
from .application_profile import iCSVProfiles, append_timepoint
from .factory import read, from_smet
from .header import MetaDataSection, FieldsSection
__all__ = ["iCSVFile", "read", "from_smet", "MetaDataSection", "FieldsSection", "iCSVProfiles", "append_timepoint"]

Sub-modules

snowpat.icsv.application_profile
snowpat.icsv.factory
snowpat.icsv.header
snowpat.icsv.icsv_file
snowpat.icsv.utility

Functions

def append_timepoint(filename: str, timestamp: datetime.datetime, data: pandas.core.frame.DataFrame, field_delimiter: str = ',')

Appends a new timepoint to the iCSV file.

Args

filename : str
The name of the file to append to.
timestamp : datetime.datetime
The timestamp of the new timepoint.
data : pd.DataFrame
The data to append.

Returns

None

Expand source code
def append_timepoint(filename: str, timestamp: datetime.datetime, data: pd.DataFrame, field_delimiter: str = ","):
    """
    Appends a new timepoint to the iCSV file.

    Args:
        filename (str): The name of the file to append to.
        timestamp (datetime.datetime): The timestamp of the new timepoint.
        data (pd.DataFrame): The data to append.

    Returns:
        None
    """
    with open(filename, 'a') as file:
        file.write(f"# [DATE={timestamp.isoformat()}]\n")
        data.to_csv(file, mode='a', index=False, header=False, sep=field_delimiter)
def from_smet(smet: SMETFile) ‑> iCSVFile

Converts an SMETFile object to an iCSVFile object.

Args

smet : SMETFile
The SMETFile object to convert.

Returns

iCSVFile
The converted iCSVFile object.
Expand source code
def from_smet(smet: SMETFile) -> iCSVFile:
    """
    Converts an SMETFile object to an iCSVFile object.

    Args:
        smet (SMETFile): The SMETFile object to convert.

    Returns:
        iCSVFile: The converted iCSVFile object.
    """
    icsv = iCSVFile()
    _set_fields_and_location(icsv, smet)
    _set_metadata(icsv, smet)
    icsv.data = smet.data
    _check_validity_and_parse_geometry(icsv, icsv.data.shape[1])
    return icsv
def read(filename: str) ‑> iCSVFile

Reads an iCSV file and returns an iCSVFile object (or the respective application profile specific object).

Args

filename : str
The path to the iCSV file.

Returns

iCSVFile/ApplicationProfile: An iCSVFile or subclass object representing the contents of the file. The iCSVFile object has the following attributes: - metadata: The metadata section of the iCSV file. access attributes via metadata.get_attribute("key") - fields: The fields section of the iCSV file. access attributes via fields.get_attribute("key") - geometry: The geometry section of the iCSV file. get the location via geometry.get_location() - data: The data section of the iCSV file. As a pandas DataFrame. - filename: The name of the iCSV file. - skip_lines: The number of lines to skip when reading the file.

Expand source code
def read(filename: str) -> iCSVFile:
    """
    Reads an iCSV file and returns an iCSVFile object (or the respective application profile specific object).

    Args:
        filename (str): The path to the iCSV file.

    Returns:
        iCSVFile/ApplicationProfile: An iCSVFile or subclass object representing the contents of the file.

    The iCSVFile object has the following attributes:
        - metadata: The metadata section of the iCSV file.
            access attributes via metadata.get_attribute("key")
        - fields: The fields section of the iCSV file.
            access attributes via fields.get_attribute("key")
        - geometry: The geometry section of the iCSV file.
            get the location via geometry.get_location()
        - data: The data section of the iCSV file.
            As a pandas DataFrame.
        - filename: The name of the iCSV file.
        - skip_lines: The number of lines to skip when reading the file.
    """
    firstline = open(filename).readline().rstrip()
    if firstline in FIRSTLINES_SNOWPROFILE:
        return iCSVProfiles(filename)
    elif firstline in FIRSTLINES:
        return iCSVFile(filename)
    else:
        raise ValueError("Not an iCSV file")

Classes

class FieldsSection

A class used to represent the fields section of an iCSV file.

Attributes

fields : list
List of fields.
recommended_fields : list
Fields that are recommended to be present in the fields section.
other_fields : list
Fields that are not recommended to be present in the fields section.

Methods

check_validity(n_cols: int): Performs a sanity check. str(): Returns a string representation of the fields. set_attribute(attribute_name: str, value: list): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. all_fields: Returns all fields. miscellaneous_fields: Returns all fields that are not required.

Expand source code
class FieldsSection:
    """
    A class used to represent the fields section of an iCSV file.

    Attributes:
        fields (list): List of fields.
        recommended_fields (list): Fields that are recommended to be present in the fields section.
        other_fields (list): Fields that are not recommended to be present in the fields section.

    Methods:
        check_validity(n_cols: int):
            Performs a sanity check.
        __str__():
            Returns a string representation of the fields.
        set_attribute(attribute_name: str, value: list):
            Sets an attribute.
        get_attribute(attribute_name: str):
            Returns an attribute.
        all_fields:
            Returns all fields.
        miscellaneous_fields:
            Returns all fields that are not required.
    """
    def __init__(self):
        self.fields = []
        self.recommended_fields = {
            "units_multiplier": [],
            "units": [],
            "long_name": [],
            "standard_name": [],
        }
        self.other_fields = {}

    def __str__(self):
        recommended_fields_string = "\n".join(
            f"{key} : {value}"
            for key, value in self.recommended_fields.items()
            if value
        )
        other_fields_string = "\n".join(
            f"{key} : {value}" for key, value in self.other_fields.items() if value
        )
        return f"Fields: {self.fields}\nRecommended Fields:\n{recommended_fields_string}\nOther Fields:\n{other_fields_string}"

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, FieldsSection):
            return False
        for attr in ["fields", "recommended_fields", "other_fields"]:
            self_dict = getattr(self, attr)
            value_dict = getattr(value, attr)

            if self_dict != value_dict:
                return False
        return True

    def check_validity(self, n_cols: int):
        if not self.fields:
            raise ValueError("No fields provided")

        if len(self.fields) != n_cols:
            raise ValueError("Number of fields does not match the number of columns")
        for key, val in self.recommended_fields.items():
            if val and len(self.recommended_fields[key]) != n_cols:
                raise ValueError(
                    f"Number of {key} does not match the number of columns"
                )

        for key, val in self.other_fields.items():
            if val and len(self.other_fields[key]) != n_cols:
                raise ValueError(
                    f"Number of {key} does not match the number of columns"
                )

    def set_attribute(self, attribute_name, value: list):
        value = [float(val) if is_number(val) else val for val in value]
        if attribute_name == "fields":
            self.fields = value
        elif attribute_name in self.recommended_fields:
            self.recommended_fields[attribute_name] = value
        else:
            self.other_fields[attribute_name] = value

    def get_attribute(self, attribute_name):
        if attribute_name == "fields":
            return self.fields
        elif attribute_name in self.recommended_fields:
            return self.recommended_fields[attribute_name]
        else:
            if attribute_name in self.other_fields:
                return self.other_fields[attribute_name]
            return None

    @property
    def all_fields(self):
        return {
            "fields": self.fields,
            **{k: v for k, v in self.recommended_fields.items() if v},
            **{k: v for k, v in self.other_fields.items() if v},
        }

    @property
    def miscalleneous_fields(self):
        return {
            **{k: v for k, v in self.recommended_fields.items() if v},
            **{k: v for k, v in self.other_fields.items() if v},
        }

Instance variables

var all_fields
Expand source code
@property
def all_fields(self):
    return {
        "fields": self.fields,
        **{k: v for k, v in self.recommended_fields.items() if v},
        **{k: v for k, v in self.other_fields.items() if v},
    }
var miscalleneous_fields
Expand source code
@property
def miscalleneous_fields(self):
    return {
        **{k: v for k, v in self.recommended_fields.items() if v},
        **{k: v for k, v in self.other_fields.items() if v},
    }

Methods

def check_validity(self, n_cols: int)
Expand source code
def check_validity(self, n_cols: int):
    if not self.fields:
        raise ValueError("No fields provided")

    if len(self.fields) != n_cols:
        raise ValueError("Number of fields does not match the number of columns")
    for key, val in self.recommended_fields.items():
        if val and len(self.recommended_fields[key]) != n_cols:
            raise ValueError(
                f"Number of {key} does not match the number of columns"
            )

    for key, val in self.other_fields.items():
        if val and len(self.other_fields[key]) != n_cols:
            raise ValueError(
                f"Number of {key} does not match the number of columns"
            )
def get_attribute(self, attribute_name)
Expand source code
def get_attribute(self, attribute_name):
    if attribute_name == "fields":
        return self.fields
    elif attribute_name in self.recommended_fields:
        return self.recommended_fields[attribute_name]
    else:
        if attribute_name in self.other_fields:
            return self.other_fields[attribute_name]
        return None
def set_attribute(self, attribute_name, value: list)
Expand source code
def set_attribute(self, attribute_name, value: list):
    value = [float(val) if is_number(val) else val for val in value]
    if attribute_name == "fields":
        self.fields = value
    elif attribute_name in self.recommended_fields:
        self.recommended_fields[attribute_name] = value
    else:
        self.other_fields[attribute_name] = value
class MetaDataSection

A class used to represent the metadata section of an iCSV file.

Attributes

required_attributes : dict
Attributes that are required to be present in the metadata.
recommended_attributes : dict
Attributes that are recommended to be present in the metadata.
acdd_metadata : dict
Metadata that is part of the ACDD standard.
other_metadata : dict
Metadata that is not part of the ACDD standard.

Methods

check_validity(): Performs a sanity check. str(): Returns a string representation of the metadata. set_attribute(attribute_name: str, value: any): Sets an attribute. get_attribute(attribute_name: str): Returns an attribute. metadata: Returns all metadata. join(other: MetaDataSection): Joins two metadata sections.

Expand source code
class MetaDataSection:
    """
    A class used to represent the metadata section of an iCSV file.

    Attributes:
        required_attributes (dict): Attributes that are required to be present in the metadata.
        recommended_attributes (dict): Attributes that are recommended to be present in the metadata.
        acdd_metadata (dict): Metadata that is part of the ACDD standard.
        other_metadata (dict): Metadata that is not part of the ACDD standard.

    Methods:
        check_validity():
            Performs a sanity check.
        __str__():
            Returns a string representation of the metadata.
        set_attribute(attribute_name: str, value: any):
            Sets an attribute.
        get_attribute(attribute_name: str):
            Returns an attribute.
        metadata:
            Returns all metadata.
        join(other: MetaDataSection):
            Joins two metadata sections.
    """
    def __init__(self):
        self.required_attributes = {
            "field_delimiter": None,
            "geometry": None,
            "srid": None,
        }
        self.recommended_attributes = {
            "station_id": None,
            "nodata": None,
            "timezone": None,
            "doi": None,
            "timestamp_meaning": None,
        }
        self.acdd_metadata = ACDDMetadata()
        self.other_metadata = {}

    def __str__(self):
        required_attribute_string = "\n".join(
            f"{key} : {value}"
            for key, value in self.required_attributes.items()
            if value is not None
        )
        recommended_attribute_string = "\n".join(
            f"{key} : {value}"
            for key, value in self.recommended_attributes.items()
            if value is not None
        )
        other_metadata_string = "\n".join(
            f"{key} : {value}"
            for key, value in self.other_metadata.items()
            if value is not None
        )
        return f"METADATA:\nRequired:\n{required_attribute_string}\nRecommended:\n{recommended_attribute_string}\n{self.acdd_metadata}\nOther Metadata:\n{other_metadata_string}"

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, MetaDataSection):
            return False

        for attr in ["required_attributes", "recommended_attributes", "other_metadata"]:
            self_dict = getattr(self, attr)
            value_dict = getattr(value, attr)

            common_keys = self_dict.keys() & value_dict.keys()

            for key in common_keys:
                if self_dict[key] is not None and value_dict[key] is not None:
                    if self_dict[key] != value_dict[key]:
                        return False

        return self.acdd_metadata == value.acdd_metadata

    def init_application(self, application_profile):
        pass

    def check_validity(self):
        for key, value in self.required_attributes.items():
            if value is None:
                raise ValueError(f"Required attribute {key} is missing")

    def set_attribute(self, attribute_name, value):
        if is_number(value):
            value = float(value)

        if attribute_name in self.required_attributes:
            self.required_attributes[attribute_name] = value
        elif attribute_name in self.recommended_attributes:
            self.recommended_attributes[attribute_name] = value

        if not self.acdd_metadata.set_attribute(attribute_name, value):
            self.other_metadata[attribute_name] = value

    def get_attribute(self, attribute_name):
        if attribute_name in self.required_attributes:
            return self.required_attributes[attribute_name]
        elif attribute_name in self.recommended_attributes:
            return self.recommended_attributes[attribute_name]
        else:
            if self.acdd_metadata.get_attribute(attribute_name):
                return self.acdd_metadata.get_attribute(attribute_name)
            if attribute_name in self.other_metadata:
                return self.other_metadata[attribute_name]
            return None

    def join(self, other: "MetaDataSection"):
        for attr_dict in [
            other.required_attributes,
            other.recommended_attributes,
            other.other_metadata,
        ]:
            for attribute, value in attr_dict.items():
                self_value = self.get_attribute(attribute)
                if value and not self_value:
                    self.set_attribute(attribute, value)
                elif value and self_value != value:
                    print(
                        f"Attribute {attribute} is different in both MetaDataSection objects"
                    )

        self.acdd_metadata.join(other.acdd_metadata)

    @property
    def metadata(self) -> dict:
        return {
            **self.required_attributes,
            **{k: v for k, v in self.recommended_attributes.items() if v},
            **{k: v for k, v in self.other_metadata.items() if v},
            **self.acdd_metadata.adjusted_dict,
        }

Instance variables

var metadata : dict
Expand source code
@property
def metadata(self) -> dict:
    return {
        **self.required_attributes,
        **{k: v for k, v in self.recommended_attributes.items() if v},
        **{k: v for k, v in self.other_metadata.items() if v},
        **self.acdd_metadata.adjusted_dict,
    }

Methods

def check_validity(self)
Expand source code
def check_validity(self):
    for key, value in self.required_attributes.items():
        if value is None:
            raise ValueError(f"Required attribute {key} is missing")
def get_attribute(self, attribute_name)
Expand source code
def get_attribute(self, attribute_name):
    if attribute_name in self.required_attributes:
        return self.required_attributes[attribute_name]
    elif attribute_name in self.recommended_attributes:
        return self.recommended_attributes[attribute_name]
    else:
        if self.acdd_metadata.get_attribute(attribute_name):
            return self.acdd_metadata.get_attribute(attribute_name)
        if attribute_name in self.other_metadata:
            return self.other_metadata[attribute_name]
        return None
def init_application(self, application_profile)
Expand source code
def init_application(self, application_profile):
    pass
def join(self, other: MetaDataSection)
Expand source code
def join(self, other: "MetaDataSection"):
    for attr_dict in [
        other.required_attributes,
        other.recommended_attributes,
        other.other_metadata,
    ]:
        for attribute, value in attr_dict.items():
            self_value = self.get_attribute(attribute)
            if value and not self_value:
                self.set_attribute(attribute, value)
            elif value and self_value != value:
                print(
                    f"Attribute {attribute} is different in both MetaDataSection objects"
                )

    self.acdd_metadata.join(other.acdd_metadata)
def set_attribute(self, attribute_name, value)
Expand source code
def set_attribute(self, attribute_name, value):
    if is_number(value):
        value = float(value)

    if attribute_name in self.required_attributes:
        self.required_attributes[attribute_name] = value
    elif attribute_name in self.recommended_attributes:
        self.recommended_attributes[attribute_name] = value

    if not self.acdd_metadata.set_attribute(attribute_name, value):
        self.other_metadata[attribute_name] = value
class iCSVFile (filename: str = None)

Class to represent an iCSV file.

Attributes

metadata : MetadataSection
Metadata section of the iCSV file.
fields : FieldsSection
Fields section of the iCSV file.
geometry : Representation class
Geometry section of the iCSV file.
data : pd.Dataframe
Data section of the iCSV file.
filename
The name of the iCSV file.
skip_lines
The number of lines to skip when reading the file.

Methods

load_file(filename: str = None): Load an iCSV file. parse_geometry(): Parse the geometry section of the iCSV file. info(): Print a summary of the iCSV file. to_xarray(): Convert the iCSV file to an xarray dataset. setData(data: pd.DataFrame, colnames: Optional[list] = None): Set the data of the iCSV file. write(filename: str = None): Write the iCSV file to a file.

Expand source code
class iCSVFile:
    """
    Class to represent an iCSV file.
    
    Attributes:
        metadata (MetadataSection): Metadata section of the iCSV file.
        fields (FieldsSection): Fields section of the iCSV file.
        geometry (Representation class): Geometry section of the iCSV file.
        data (pd.Dataframe): Data section of the iCSV file.
        filename: The name of the iCSV file.
        skip_lines: The number of lines to skip when reading the file.
        
    Methods:
        load_file(filename: str = None): Load an iCSV file.
        parse_geometry(): Parse the geometry section of the iCSV file.
        info(): Print a summary of the iCSV file.
        to_xarray(): Convert the iCSV file to an xarray dataset.
        setData(data: pd.DataFrame, colnames: Optional[list] = None): Set the data of the iCSV file.
        write(filename: str = None): Write the iCSV file to a file.
    """
    def __init__(self, filename:str = None):
        self.metadata = MetaDataSection()
        self.fields = FieldsSection()
        self.geometry = Geometry()
        self.data = None
        self.filename = filename
        self.skip_lines = 0
        
        if self.filename:
            self.load_file()
            
    
    def __str__(self) -> str:
        return f"File: {self.filename}\n{self.metadata}\n{self.fields}\n{self.geometry}"
    
    def __eq__(self, value: object) -> bool:
        try:
            for attr in ['metadata', 'fields', 'geometry']:
                self_value = getattr(self, attr)
                value_value = getattr(value, attr)
                
                if self_value != value_value:
                    return False
            return True
        except AttributeError:
            return False
    
    def _parse_comment_line(self, line, section):
        if line == "[METADATA]":
            return "metadata"
        elif line == "[FIELDS]":
            self.metadata.check_validity()  # to parse fields we need valid metadata
            return "fields"
        elif line == "[DATA]":
            return "data"
        else:
            return self._parse_section_line(line, section)

    def _parse_section_line(self, line, section):
        if not section:
            raise ValueError("No section specified")
        line_vals = line.split("=")
        if len(line_vals) != 2:
            raise ValueError(f"Invalid {section} line: {line}, got 2 assignment operators \"=\"")

        if section == "metadata":
            self.metadata.set_attribute(line_vals[0].strip(), line_vals[1].strip())
        elif section == "fields":
            fields_vec = [field.strip() for field in line_vals[1].split(self.metadata.get_attribute("field_delimiter"))]
            self.fields.set_attribute(line_vals[0].strip(), fields_vec)
        elif section == "data":
            raise TypeError("Data section should not contain any comments")

        return section

    def _update_columns(self):
        self.data.columns = self.fields.fields
        for field in ["time", "timestamp"]:
            if field in self.fields.fields:
                self.data[field] = pd.to_datetime(self.data[field])          
    
    def load_file(self, filename: str = None):
        """Loads an iCSV file and parses its contents.

        Args:
            filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used.

        Raises:
            ValueError: If the file is not a valid iCSV file or if the data section is not specified.

        Returns:
            None
        """
        if filename:
            self.filename = filename
            
        section = ""
        with open(self.filename, 'r') as file:
            first_line = file.readline().rstrip()  # rstrip() is used to remove the trailing newline
            if first_line not in FIRSTLINES:
                raise ValueError("Not an iCSV file")
        
            line_number = 1 # need to find the line number where the data starts
            for line in file:
                if line.startswith("#"):
                    line_number += 1
                    line = line[1:].strip()
                    section = self._parse_comment_line(line.strip(), section)
                else:
                    if section != "data":
                        raise ValueError("Data section was not specified")
                    self.skip_lines = line_number
                    break
        
        self.data = pd.read_csv(self.filename, skiprows=self.skip_lines, header=None, sep=self.metadata.get_attribute("field_delimiter"))
        self.fields.check_validity(self.data.shape[1]) # check if the number of fields match the number of columns
        self._update_columns()           
        self.parse_geometry()
        
    def parse_geometry(self):
        if self.metadata.get_attribute("geometry") in self.fields.get_attribute("fields"):
            self.geometry.geometry = self.metadata.get_attribute("geometry")
            self.geometry.srid = self.metadata.get_attribute("srid")
            self.geometry.column_name = self.metadata.get_attribute("column_name")
        else:
            self.geometry.geometry = self.metadata.get_attribute("geometry")
            self.geometry.srid = self.metadata.get_attribute("srid")
            self.geometry.set_location()    
            
    def info(self):
        """
        Prints information about the object and its data.

        This method prints the object itself and the head of its data.

        Args:
            None

        Returns:
            None
        """
        print(self)
        print("\nData:")
        print(self.data.head())
    
    def to_xarray(self) -> xr.Dataset:
        """
        Converts the data to an xarray dataset.

        Returns:
            xarray.Dataset: The converted xarray dataset.
        """
        arr = self.data.to_xarray()
        arr.attrs = self.metadata.metadata
        for i,var in enumerate(arr.data_vars):
            for _, vec in self.fields.miscalleneous_fields.items():
                arr[var].attrs = vec[i]
                
    def setData(self, data: pd.DataFrame, colnames: Optional[list] = None):
        """
        Sets the data of the iCSV file.

        Args:
            data (pd.DataFrame): The data to set.
            colnames (list): The names of the columns in the data.

        Returns:
            None
        """
        self.data = data
        if colnames:
            if len(colnames) != self.data.shape[1]:
                raise ValueError("Number of columns in data does not match the number of column names")
            self.fields.set_attribute("fields", colnames)
        else:
            colnames = self.data.columns.to_list()
            if colnames[0] == "0" or colnames[0] == 0:
                raise ValueError("Column names are not provided")
            self.fields.set_attribute("fields", colnames)
                # Ensure 'timestamp' is the first column if it exists
        if 'timestamp' in self.data.columns:
            cols = self.data.columns.tolist()
            if cols[0] != 'timestamp':
                cols.insert(0, cols.pop(cols.index('timestamp')))
                self.data = self.data[cols]
            self.fields.set_attribute("fields", self.data.columns)

            
        
                
    def write(self, filename: str = None):
        """
        Writes the metadata, fields, and data to a CSV file.

        Args:
            filename (str, optional): The name of the file to write. If not provided, the current filename will be used.

        Returns:
            None
        """
        
        if filename:
            self.filename = filename
            
        self.metadata.check_validity()
        self.fields.check_validity(self.data.shape[1])
        
            

        
        with open(self.filename, 'w') as file:
            file.write(f"{FIRSTLINES[-1]}\n")
            file.write("# [METADATA]\n")
            for key, val in self.metadata.metadata.items():
                file.write(f"# {key} = {val}\n")
            file.write("# [FIELDS]\n")
            for key, val in self.fields.all_fields.items():
                fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val)
                file.write(f"# {key} = {fields_string}\n")
            file.write("# [DATA]\n")
            
        self.data.to_csv(self.filename, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter"))

Subclasses

Methods

def info(self)

Prints information about the object and its data.

This method prints the object itself and the head of its data.

Args

None

Returns

None

Expand source code
def info(self):
    """
    Prints information about the object and its data.

    This method prints the object itself and the head of its data.

    Args:
        None

    Returns:
        None
    """
    print(self)
    print("\nData:")
    print(self.data.head())
def load_file(self, filename: str = None)

Loads an iCSV file and parses its contents.

Args

filename : str, optional
The path to the iCSV file. If not provided, the previously set filename will be used.

Raises

ValueError
If the file is not a valid iCSV file or if the data section is not specified.

Returns

None

Expand source code
def load_file(self, filename: str = None):
    """Loads an iCSV file and parses its contents.

    Args:
        filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used.

    Raises:
        ValueError: If the file is not a valid iCSV file or if the data section is not specified.

    Returns:
        None
    """
    if filename:
        self.filename = filename
        
    section = ""
    with open(self.filename, 'r') as file:
        first_line = file.readline().rstrip()  # rstrip() is used to remove the trailing newline
        if first_line not in FIRSTLINES:
            raise ValueError("Not an iCSV file")
    
        line_number = 1 # need to find the line number where the data starts
        for line in file:
            if line.startswith("#"):
                line_number += 1
                line = line[1:].strip()
                section = self._parse_comment_line(line.strip(), section)
            else:
                if section != "data":
                    raise ValueError("Data section was not specified")
                self.skip_lines = line_number
                break
    
    self.data = pd.read_csv(self.filename, skiprows=self.skip_lines, header=None, sep=self.metadata.get_attribute("field_delimiter"))
    self.fields.check_validity(self.data.shape[1]) # check if the number of fields match the number of columns
    self._update_columns()           
    self.parse_geometry()
def parse_geometry(self)
Expand source code
def parse_geometry(self):
    if self.metadata.get_attribute("geometry") in self.fields.get_attribute("fields"):
        self.geometry.geometry = self.metadata.get_attribute("geometry")
        self.geometry.srid = self.metadata.get_attribute("srid")
        self.geometry.column_name = self.metadata.get_attribute("column_name")
    else:
        self.geometry.geometry = self.metadata.get_attribute("geometry")
        self.geometry.srid = self.metadata.get_attribute("srid")
        self.geometry.set_location()    
def setData(self, data: pandas.core.frame.DataFrame, colnames: Optional[list] = None)

Sets the data of the iCSV file.

Args

data : pd.DataFrame
The data to set.
colnames : list
The names of the columns in the data.

Returns

None

Expand source code
def setData(self, data: pd.DataFrame, colnames: Optional[list] = None):
    """
    Sets the data of the iCSV file.

    Args:
        data (pd.DataFrame): The data to set.
        colnames (list): The names of the columns in the data.

    Returns:
        None
    """
    self.data = data
    if colnames:
        if len(colnames) != self.data.shape[1]:
            raise ValueError("Number of columns in data does not match the number of column names")
        self.fields.set_attribute("fields", colnames)
    else:
        colnames = self.data.columns.to_list()
        if colnames[0] == "0" or colnames[0] == 0:
            raise ValueError("Column names are not provided")
        self.fields.set_attribute("fields", colnames)
            # Ensure 'timestamp' is the first column if it exists
    if 'timestamp' in self.data.columns:
        cols = self.data.columns.tolist()
        if cols[0] != 'timestamp':
            cols.insert(0, cols.pop(cols.index('timestamp')))
            self.data = self.data[cols]
        self.fields.set_attribute("fields", self.data.columns)
def to_xarray(self) ‑> xarray.core.dataset.Dataset

Converts the data to an xarray dataset.

Returns

xarray.Dataset
The converted xarray dataset.
Expand source code
def to_xarray(self) -> xr.Dataset:
    """
    Converts the data to an xarray dataset.

    Returns:
        xarray.Dataset: The converted xarray dataset.
    """
    arr = self.data.to_xarray()
    arr.attrs = self.metadata.metadata
    for i,var in enumerate(arr.data_vars):
        for _, vec in self.fields.miscalleneous_fields.items():
            arr[var].attrs = vec[i]
def write(self, filename: str = None)

Writes the metadata, fields, and data to a CSV file.

Args

filename : str, optional
The name of the file to write. If not provided, the current filename will be used.

Returns

None

Expand source code
def write(self, filename: str = None):
    """
    Writes the metadata, fields, and data to a CSV file.

    Args:
        filename (str, optional): The name of the file to write. If not provided, the current filename will be used.

    Returns:
        None
    """
    
    if filename:
        self.filename = filename
        
    self.metadata.check_validity()
    self.fields.check_validity(self.data.shape[1])
    
        

    
    with open(self.filename, 'w') as file:
        file.write(f"{FIRSTLINES[-1]}\n")
        file.write("# [METADATA]\n")
        for key, val in self.metadata.metadata.items():
            file.write(f"# {key} = {val}\n")
        file.write("# [FIELDS]\n")
        for key, val in self.fields.all_fields.items():
            fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val)
            file.write(f"# {key} = {fields_string}\n")
        file.write("# [DATA]\n")
        
    self.data.to_csv(self.filename, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter"))
class iCSVProfiles (filename: str = None)

Class to represent an iCSV file containing snow profile data.

The iCSVProfiles extends the iCSVFile class to handle the specific structure and requirements of snow profile data, which includes multiple timestamped profile measurements and point measurements.

Attributes (additional to iCSVFile): dates (list): List of datetime objects representing measurement dates in the file. date_lines (list): List of line numbers where date entries begin in the file. data (dict): Dictionary mapping datetime objects to pandas DataFrames containing the profile data for that timestamp.

Key Features: - Handles multiple time-stamped profiles in a single file - Separates point measurements from profile measurements - Provides methods to extract and filter point and profile values - Can convert to xarray Dataset for multi-dimensional data analysis

The snowprofile format follows the iCSV specification with the addition of [DATE=timestamp] markers in the data section to separate measurements from different dates. Each profile must include a 'layer_number' field to identify profile layers versus point measurements.

Expand source code
class iCSVProfiles(iCSVFile):
    """
    Class to represent an iCSV file containing snow profile data.

    The iCSVProfiles extends the iCSVFile class to handle the specific structure
    and requirements of snow profile data, which includes multiple timestamped
    profile measurements and point measurements.

    Attributes (additional to iCSVFile):
        dates (list): List of datetime objects representing measurement dates in the file.
        date_lines (list): List of line numbers where date entries begin in the file.
        data (dict): Dictionary mapping datetime objects to pandas DataFrames containing
                    the profile data for that timestamp.

    Key Features:
        - Handles multiple time-stamped profiles in a single file
        - Separates point measurements from profile measurements
        - Provides methods to extract and filter point and profile values
        - Can convert to xarray Dataset for multi-dimensional data analysis

    The snowprofile format follows the iCSV specification with the addition of
    [DATE=timestamp] markers in the data section to separate measurements from
    different dates. Each profile must include a 'layer_number' field to identify
    profile layers versus point measurements.
    """
    def __init__(self, filename: str = None):
        self.dates = []
        self.date_lines = []
        super().__init__(filename)

    def _parse_comment_line(self, line, section, line_number):
        if line == "[METADATA]":
            return "metadata"
        elif line == "[FIELDS]":
            self.metadata.check_validity()  # to parse fields we need valid metadata
            return "fields"
        elif line == "[DATA]":
            return "data"
        else:
            return self._parse_section_line(line, section, line_number)

    def _parse_section_line(self, line, section, line_number):
        if not section:
            raise ValueError("No section specified")
        line_vals = line.split("=")
        if len(line_vals) != 2:
            raise ValueError(f"Invalid {section} line: {line}, got 2 assignment operators \"=\"")

        if section == "metadata":
            self.metadata.set_attribute(line_vals[0].strip(), line_vals[1].strip())
        elif section == "fields":
            fields_vec = [field.strip() for field in line_vals[1].split(self.metadata.get_attribute("field_delimiter"))]
            self.fields.set_attribute(line_vals[0].strip(), fields_vec)
        elif section == "data":
            if "[DATE=" in line:
                date_str = line.split('[DATE=')[1].split(']')[0].strip()
                self.dates.append(datetime.datetime.fromisoformat(date_str))
                self.date_lines.append(line_number)
            else:
                raise ValueError(f"Invalid data line: {line}")

        return section

    def load_file(self, filename: str = None):
        """Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile.

        Args:
            filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used.

        Raises:
            ValueError: If the file is not a valid iCSV file or if the data section is not specified.

        Returns:
            None
        """
        self.data = dict()
        if filename:
            self.filename = filename

        section = ""
        with open(self.filename, 'r') as file:
            first_line = file.readline().rstrip()  # rstrip() is used to remove the trailing newline
            if first_line not in FIRSTLINES_SNOWPROFILE:
                raise ValueError("Not an iCSV file with the snowprofile application profile")

            line_number = 1 # need to find the line number where the data starts
            for line in file:
                line_number += 1
                if line.startswith("#"):
                    line = line[1:].strip()
                    section = self._parse_comment_line(line.strip(), section, line_number)
                else:
                    if section != "data":
                        raise ValueError("Data section was not specified")


        for (i, date) in enumerate(self.dates):
            first_data_line = self.date_lines[i]
            last_data_line = self.date_lines[i+1] if i+1 < len(self.dates) else line_number + 1
            self.data[date] = pd.read_csv(self.filename, skiprows=first_data_line, nrows=last_data_line-first_data_line-1, header=None, sep=self.metadata.get_attribute("field_delimiter"))
            self.data[date].columns = self.fields.fields

        self.fields.check_validity(self.data[self.dates[0]].shape[1]) # check if the number of fields match the number of columns
        self.parse_geometry()

    def info(self):
        """
        Prints information about the object and its data.

        This method prints the object itself and the head of its data.

        Args:
            None

        Returns:
            None
        """
        print(self)
        print("\nDates:")
        print(self.dates)
        print("\nFirst Profile:")
        print(self.data[self.dates[0]].head())

    def to_xarray(self):
        """
        Converts the data to a single 3D xarray Dataset with 'time' as one dimension.

        Returns:
            xarray.Dataset: The combined xarray dataset.
        """

        # Convert each DataFrame to xarray DataArray
        arrays = []
        for date in self.dates:
            df = self.data[date].copy()
            df.set_index("layer_number", inplace=True)
            arrays.append(df.to_xarray())
        # Concatenate along new time dimension
        ds = xr.concat(arrays, dim="time")
        ds = ds.assign_coords(time=self.dates)
        # Optionally add metadata
        ds.attrs = self.metadata.metadata
        return ds

    def setData(self, timestamp: datetime.datetime, data: pd.DataFrame, colnames: Optional[list] = None):
        if not self.data:
            self.data = dict()
        self.dates.append(timestamp)
        self.data[timestamp] = data

    def write(self, filename: str = None):
        """
        Writes the metadata, fields, and data to a CSV file.

        Args:
            filename (str, optional): The name of the file to write. If not provided, the current filename will be used.

        Returns:
            None
        """

        if filename:
            self.filename = filename

        self.metadata.check_validity()
        if "model" not in self.metadata.metadata:
            raise ValueError("model is a required metadata for the Snowprofile application profile")
        first_key = self.dates[0]
        self.fields.check_validity(self.data[first_key].shape[1])
        if "layer_number" not in self.fields.fields:
            raise ValueError("layer_number is a required field for the Snowprofile application profile")

        with open(self.filename, 'w') as file:
            file.write(f"{FIRSTLINES_SNOWPROFILE[-1]}\n")
            file.write("# [METADATA]\n")
            for key, val in self.metadata.metadata.items():
                file.write(f"# {key} = {val}\n")
            file.write("# [FIELDS]\n")
            for key, val in self.fields.all_fields.items():
                fields_string = self.metadata.get_attribute("field_delimiter").join(str(value) for value in val)
                file.write(f"# {key} = {fields_string}\n")
            file.write("# [DATA]\n")
            for date in self.dates:
                file.write(f"# [DATE={date.isoformat()}]\n")
                self.data[date].to_csv(file, mode='a', index=False, header=False, sep=self.metadata.get_attribute("field_delimiter") )

    def get_point_values(self):
        """
        Extracts point measurements from the snow profile data.

        Retrieves all data rows where 'layer_number' equals 'point' across all dates,
        combining them into a single DataFrame with timestamps. Automatically filters
        out columns that contain only nodata values.

        Returns:
            pandas.DataFrame: DataFrame containing all point measurements with timestamps,
                             with columns filtered to include only those with valid data.
                             Returns an empty DataFrame if no point measurements are found.
        """

        nodata = self.metadata.get_attribute("nodata")
        point_rows = []
        for date in self.dates:
            data_at_date = self.data[date]
            if "point" not in data_at_date["layer_number"].values:
                continue
            point_df = data_at_date[data_at_date["layer_number"] == "point"].copy()
            point_df.insert(0, "timestamp", date)
            point_rows.append(point_df)
        if not point_rows:
            return pd.DataFrame(), []  # Empty DataFrame and empty list if no points found

        all_points = pd.concat(point_rows, ignore_index=True)
        # Exclude the 'timestamp' and 'layer_number' columns from nodata filtering
        cols_to_check = [col for col in all_points.columns if col not in ["timestamp", "layer_number"]]
        # Find columns where not all values are -999
        valid_cols = [col for col in cols_to_check if not (all_points[col] == nodata).all()]
        # Always keep 'timestamp' and 'layer_number'
        final_cols = ["timestamp"] + valid_cols
        filtered_points = all_points[final_cols]
        return filtered_points

    def get_profile_values(self, as_xarray=False):
        """
        Extracts profile measurements from the snow profile data.

        Retrieves all data rows where 'layer_number' is not 'point' across all dates,
        organizing them by timestamp. Automatically filters out columns containing only
        nodata values for each profile.

        Args:
            as_xarray (bool, optional): If True, returns data as an xarray Dataset with
                                       dimensions (time, layer) instead of a dictionary.
                                       Default is False.

        Returns:
            dict or xarray.Dataset: If as_xarray=False (default), returns a dictionary mapping
                                   datetime objects to pandas DataFrames with layer_number as index.
                                   If as_xarray=True, returns a multi-dimensional xarray Dataset with
                                   dimensions of time and layer, and variables for each measurement type.
        """

        NODATA = self.metadata.get_attribute("nodata")
        profiles_dict = {}
        for date in self.dates:
            data_at_date = self.data[date]
            profile_df = data_at_date[data_at_date["layer_number"] != "point"].copy()
            colnames = profile_df.columns
            # Remove columns where all values are -999 (except 'layer_number')
            valid_cols = [col for col in colnames if not (profile_df[col] == NODATA).all()]
            filtered_df = profile_df[valid_cols]
            profiles_dict[date] = filtered_df.set_index("layer_number")
        if as_xarray and profiles_dict:
            # Convert to xarray Dataset: dims=time, layer, variables=fields
            # Find union of all columns
            all_vars = set()
            for df in profiles_dict.values():
                all_vars.update(df.columns)
            all_vars = sorted(all_vars)
            # Build a 3D array: (time, layer, variable)
            times = list(profiles_dict.keys())
            max_layers = max(len(df) for df in profiles_dict.values())
            data = {var: [] for var in all_vars}
            layer_numbers = []
            for date in times:
                df = profiles_dict[date]
                layer_numbers.append(df.index.tolist() + [None]*(max_layers - len(df)))
                for var in all_vars:
                    col_data = df[var].tolist() if var in df.columns else [NODATA]*len(df)
                    col_data += [NODATA]*(max_layers - len(col_data))
                    data[var].append(col_data)
            ds = xr.Dataset(
                {var: (['time', 'layer'], data[var]) for var in all_vars},
                coords={
                    'time': times,
                    'layer': range(max_layers),
                    'layer_number': (['time', 'layer'], layer_numbers)
                }
            )
            return ds
        return profiles_dict

Ancestors

Methods

def get_point_values(self)

Extracts point measurements from the snow profile data.

Retrieves all data rows where 'layer_number' equals 'point' across all dates, combining them into a single DataFrame with timestamps. Automatically filters out columns that contain only nodata values.

Returns

pandas.DataFrame
DataFrame containing all point measurements with timestamps, with columns filtered to include only those with valid data. Returns an empty DataFrame if no point measurements are found.
Expand source code
def get_point_values(self):
    """
    Extracts point measurements from the snow profile data.

    Retrieves all data rows where 'layer_number' equals 'point' across all dates,
    combining them into a single DataFrame with timestamps. Automatically filters
    out columns that contain only nodata values.

    Returns:
        pandas.DataFrame: DataFrame containing all point measurements with timestamps,
                         with columns filtered to include only those with valid data.
                         Returns an empty DataFrame if no point measurements are found.
    """

    nodata = self.metadata.get_attribute("nodata")
    point_rows = []
    for date in self.dates:
        data_at_date = self.data[date]
        if "point" not in data_at_date["layer_number"].values:
            continue
        point_df = data_at_date[data_at_date["layer_number"] == "point"].copy()
        point_df.insert(0, "timestamp", date)
        point_rows.append(point_df)
    if not point_rows:
        return pd.DataFrame(), []  # Empty DataFrame and empty list if no points found

    all_points = pd.concat(point_rows, ignore_index=True)
    # Exclude the 'timestamp' and 'layer_number' columns from nodata filtering
    cols_to_check = [col for col in all_points.columns if col not in ["timestamp", "layer_number"]]
    # Find columns where not all values are -999
    valid_cols = [col for col in cols_to_check if not (all_points[col] == nodata).all()]
    # Always keep 'timestamp' and 'layer_number'
    final_cols = ["timestamp"] + valid_cols
    filtered_points = all_points[final_cols]
    return filtered_points
def get_profile_values(self, as_xarray=False)

Extracts profile measurements from the snow profile data.

Retrieves all data rows where 'layer_number' is not 'point' across all dates, organizing them by timestamp. Automatically filters out columns containing only nodata values for each profile.

Args

as_xarray : bool, optional
If True, returns data as an xarray Dataset with dimensions (time, layer) instead of a dictionary. Default is False.

Returns

dict or xarray.Dataset
If as_xarray=False (default), returns a dictionary mapping datetime objects to pandas DataFrames with layer_number as index. If as_xarray=True, returns a multi-dimensional xarray Dataset with dimensions of time and layer, and variables for each measurement type.
Expand source code
def get_profile_values(self, as_xarray=False):
    """
    Extracts profile measurements from the snow profile data.

    Retrieves all data rows where 'layer_number' is not 'point' across all dates,
    organizing them by timestamp. Automatically filters out columns containing only
    nodata values for each profile.

    Args:
        as_xarray (bool, optional): If True, returns data as an xarray Dataset with
                                   dimensions (time, layer) instead of a dictionary.
                                   Default is False.

    Returns:
        dict or xarray.Dataset: If as_xarray=False (default), returns a dictionary mapping
                               datetime objects to pandas DataFrames with layer_number as index.
                               If as_xarray=True, returns a multi-dimensional xarray Dataset with
                               dimensions of time and layer, and variables for each measurement type.
    """

    NODATA = self.metadata.get_attribute("nodata")
    profiles_dict = {}
    for date in self.dates:
        data_at_date = self.data[date]
        profile_df = data_at_date[data_at_date["layer_number"] != "point"].copy()
        colnames = profile_df.columns
        # Remove columns where all values are -999 (except 'layer_number')
        valid_cols = [col for col in colnames if not (profile_df[col] == NODATA).all()]
        filtered_df = profile_df[valid_cols]
        profiles_dict[date] = filtered_df.set_index("layer_number")
    if as_xarray and profiles_dict:
        # Convert to xarray Dataset: dims=time, layer, variables=fields
        # Find union of all columns
        all_vars = set()
        for df in profiles_dict.values():
            all_vars.update(df.columns)
        all_vars = sorted(all_vars)
        # Build a 3D array: (time, layer, variable)
        times = list(profiles_dict.keys())
        max_layers = max(len(df) for df in profiles_dict.values())
        data = {var: [] for var in all_vars}
        layer_numbers = []
        for date in times:
            df = profiles_dict[date]
            layer_numbers.append(df.index.tolist() + [None]*(max_layers - len(df)))
            for var in all_vars:
                col_data = df[var].tolist() if var in df.columns else [NODATA]*len(df)
                col_data += [NODATA]*(max_layers - len(col_data))
                data[var].append(col_data)
        ds = xr.Dataset(
            {var: (['time', 'layer'], data[var]) for var in all_vars},
            coords={
                'time': times,
                'layer': range(max_layers),
                'layer_number': (['time', 'layer'], layer_numbers)
            }
        )
        return ds
    return profiles_dict
def load_file(self, filename: str = None)

Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile.

Args

filename : str, optional
The path to the iCSV file. If not provided, the previously set filename will be used.

Raises

ValueError
If the file is not a valid iCSV file or if the data section is not specified.

Returns

None

Expand source code
def load_file(self, filename: str = None):
    """Loads an iCSV file and parses its contents, extracting the dates and data lines for a snow profile.

    Args:
        filename (str, optional): The path to the iCSV file. If not provided, the previously set filename will be used.

    Raises:
        ValueError: If the file is not a valid iCSV file or if the data section is not specified.

    Returns:
        None
    """
    self.data = dict()
    if filename:
        self.filename = filename

    section = ""
    with open(self.filename, 'r') as file:
        first_line = file.readline().rstrip()  # rstrip() is used to remove the trailing newline
        if first_line not in FIRSTLINES_SNOWPROFILE:
            raise ValueError("Not an iCSV file with the snowprofile application profile")

        line_number = 1 # need to find the line number where the data starts
        for line in file:
            line_number += 1
            if line.startswith("#"):
                line = line[1:].strip()
                section = self._parse_comment_line(line.strip(), section, line_number)
            else:
                if section != "data":
                    raise ValueError("Data section was not specified")


    for (i, date) in enumerate(self.dates):
        first_data_line = self.date_lines[i]
        last_data_line = self.date_lines[i+1] if i+1 < len(self.dates) else line_number + 1
        self.data[date] = pd.read_csv(self.filename, skiprows=first_data_line, nrows=last_data_line-first_data_line-1, header=None, sep=self.metadata.get_attribute("field_delimiter"))
        self.data[date].columns = self.fields.fields

    self.fields.check_validity(self.data[self.dates[0]].shape[1]) # check if the number of fields match the number of columns
    self.parse_geometry()
def to_xarray(self)

Converts the data to a single 3D xarray Dataset with 'time' as one dimension.

Returns

xarray.Dataset
The combined xarray dataset.
Expand source code
def to_xarray(self):
    """
    Converts the data to a single 3D xarray Dataset with 'time' as one dimension.

    Returns:
        xarray.Dataset: The combined xarray dataset.
    """

    # Convert each DataFrame to xarray DataArray
    arrays = []
    for date in self.dates:
        df = self.data[date].copy()
        df.set_index("layer_number", inplace=True)
        arrays.append(df.to_xarray())
    # Concatenate along new time dimension
    ds = xr.concat(arrays, dim="time")
    ds = ds.assign_coords(time=self.dates)
    # Optionally add metadata
    ds.attrs = self.metadata.metadata
    return ds

Inherited members