Skip to content

datavault

DataVault

Bases: object

Source code in hdxms_datasets/datavault.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class DataVault(object):
    def __init__(
        self,
        cache_dir: Union[Path, str],
        remote_url: str = DATABASE_URL,
    ):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True, parents=True)

        self.remote_url = remote_url
        self.remote_index: Optional[pd.DataFrame] = None

    def filter(self, *spec: dict):
        # filters list of available datasets
        raise NotImplementedError("Not yet implemented")

    def get_index(self, on_error="ignore") -> Optional[pd.DataFrame]:
        """Retrieves the index of available datasets

        on success, returns the index dataframe and
        stores as `remote_index` attribute.

        """

        url = urllib.parse.urljoin(self.remote_url, "index.csv")
        try:
            index_df = pd.read_csv(url)
            self.remote_index = index_df
            return index_df

        except urllib.error.HTTPError as err:
            if on_error == "ignore":
                pass
            elif on_error == "warn":
                warnings.warn(f"Error loading index: {err}")
            else:
                raise err

    @property
    def datasets(self) -> list[str]:
        """List of available datasets in the cache dir"""
        return [d.stem for d in self.cache_dir.iterdir() if self.is_dataset(d)]

    @staticmethod
    def is_dataset(path: Path) -> bool:
        """
        Checks if the supplied path is a HDX-MS dataset.
        """

        return (path / "hdx_spec.yaml").exists()

    async def fetch_datasets(self, n: Optional[str] = None, data_ids: Optional[list[str]] = None):
        """
        Asynchronously download multiple datasets
        """
        raise NotImplementedError("Not yet implemented")

        if n is None and data_ids is None:
            n = 10

        if data_ids:
            # Download specified datasets to cache_dir
            ...

        elif n:
            # Download n new datasets to cache_dir
            ...

    def fetch_dataset(self, data_id: str) -> bool:
        """
        Download a dataset from the online repository to the cache dir

        Args:
            data_id: The ID of the dataset to download.

        Returns:
            `True` if the dataset was downloaded successfully, `False`  otherwise.
        """

        output_pth = self.cache_dir / data_id
        if output_pth.exists():
            return False
        else:
            output_pth.mkdir()

        dataset_url = urllib.parse.urljoin(self.remote_url, data_id + "/")

        files = ["hdx_spec.yaml", "metadata.yaml"]
        optional_files = ["CITATION.cff"]
        hdx_spec = None
        for f in files + optional_files:
            url = urllib.parse.urljoin(dataset_url, f)
            response = requests.get(url)

            if response.ok:
                (output_pth / f).write_bytes(response.content)

            elif f in files:
                raise urllib.error.HTTPError(
                    url,
                    response.status_code,
                    f"Error for file {f!r}",
                    response.headers,  # type: ignore
                    None,
                )

            if f == "hdx_spec.yaml":
                hdx_spec = yaml.safe_load(response.text)

        if hdx_spec is None:
            raise ValueError(f"Could not find HDX spec for data_id {data_id!r}")

        data_pth = output_pth / "data"
        data_pth.mkdir()

        for file_spec in hdx_spec["data_files"].values():
            filename = file_spec["filename"]
            f_url = urllib.parse.urljoin(dataset_url, filename)
            response = requests.get(f_url)

            if response.ok:
                (output_pth / filename).write_bytes(response.content)
            else:
                raise urllib.error.HTTPError(
                    f_url,
                    response.status_code,
                    f"Error for data file {filename!r}",
                    response.headers,  # type: ignore
                    None,
                )

        return True

    def clear_cache(self) -> None:
        for dir in self.cache_dir.iterdir():
            shutil.rmtree(dir)

    def get_metadata(self, data_id: str) -> dict:
        return yaml.safe_load((self.cache_dir / data_id / "metadata.yaml").read_text())

    def load_dataset(self, data_id: str) -> DataSet:
        hdx_spec = yaml.safe_load((self.cache_dir / data_id / "hdx_spec.yaml").read_text())
        dataset_metadata = self.get_metadata(data_id)

        return DataSet.from_spec(
            hdx_spec=hdx_spec,
            data_dir=self.cache_dir / data_id,
            data_id=data_id,
            metadata=dataset_metadata,
        )

datasets: list[str] property

List of available datasets in the cache dir

fetch_dataset(data_id)

Download a dataset from the online repository to the cache dir

Parameters:

Name Type Description Default
data_id str

The ID of the dataset to download.

required

Returns:

Type Description
bool

True if the dataset was downloaded successfully, False otherwise.

Source code in hdxms_datasets/datavault.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def fetch_dataset(self, data_id: str) -> bool:
    """
    Download a dataset from the online repository to the cache dir

    Args:
        data_id: The ID of the dataset to download.

    Returns:
        `True` if the dataset was downloaded successfully, `False`  otherwise.
    """

    output_pth = self.cache_dir / data_id
    if output_pth.exists():
        return False
    else:
        output_pth.mkdir()

    dataset_url = urllib.parse.urljoin(self.remote_url, data_id + "/")

    files = ["hdx_spec.yaml", "metadata.yaml"]
    optional_files = ["CITATION.cff"]
    hdx_spec = None
    for f in files + optional_files:
        url = urllib.parse.urljoin(dataset_url, f)
        response = requests.get(url)

        if response.ok:
            (output_pth / f).write_bytes(response.content)

        elif f in files:
            raise urllib.error.HTTPError(
                url,
                response.status_code,
                f"Error for file {f!r}",
                response.headers,  # type: ignore
                None,
            )

        if f == "hdx_spec.yaml":
            hdx_spec = yaml.safe_load(response.text)

    if hdx_spec is None:
        raise ValueError(f"Could not find HDX spec for data_id {data_id!r}")

    data_pth = output_pth / "data"
    data_pth.mkdir()

    for file_spec in hdx_spec["data_files"].values():
        filename = file_spec["filename"]
        f_url = urllib.parse.urljoin(dataset_url, filename)
        response = requests.get(f_url)

        if response.ok:
            (output_pth / filename).write_bytes(response.content)
        else:
            raise urllib.error.HTTPError(
                f_url,
                response.status_code,
                f"Error for data file {filename!r}",
                response.headers,  # type: ignore
                None,
            )

    return True

fetch_datasets(n=None, data_ids=None) async

Asynchronously download multiple datasets

Source code in hdxms_datasets/datavault.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def fetch_datasets(self, n: Optional[str] = None, data_ids: Optional[list[str]] = None):
    """
    Asynchronously download multiple datasets
    """
    raise NotImplementedError("Not yet implemented")

    if n is None and data_ids is None:
        n = 10

    if data_ids:
        # Download specified datasets to cache_dir
        ...

    elif n:
        # Download n new datasets to cache_dir
        ...

get_index(on_error='ignore')

Retrieves the index of available datasets

on success, returns the index dataframe and stores as remote_index attribute.

Source code in hdxms_datasets/datavault.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def get_index(self, on_error="ignore") -> Optional[pd.DataFrame]:
    """Retrieves the index of available datasets

    on success, returns the index dataframe and
    stores as `remote_index` attribute.

    """

    url = urllib.parse.urljoin(self.remote_url, "index.csv")
    try:
        index_df = pd.read_csv(url)
        self.remote_index = index_df
        return index_df

    except urllib.error.HTTPError as err:
        if on_error == "ignore":
            pass
        elif on_error == "warn":
            warnings.warn(f"Error loading index: {err}")
        else:
            raise err

is_dataset(path) staticmethod

Checks if the supplied path is a HDX-MS dataset.

Source code in hdxms_datasets/datavault.py
63
64
65
66
67
68
69
@staticmethod
def is_dataset(path: Path) -> bool:
    """
    Checks if the supplied path is a HDX-MS dataset.
    """

    return (path / "hdx_spec.yaml").exists()