Skip to content

Datadancer

Datenverarbeitung und Analyse.

API-Referenz

datadancer

datadancer — High-level data fetching and DataFrame conversion.

Schnellstart

from datadancer import SmardClient, Filter, Region, Resolution

async with SmardClient() as client: df = await client.fetch_latest(Filter.WIND_ONSHORE, hours=48) print(df.tail())

Filter

Bases: IntEnum

SMARD Filter-IDs — entsprechen den Datenkategorien.

Source code in datadancer/sources/smard/constants.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Filter(IntEnum):
    """SMARD Filter-IDs — entsprechen den Datenkategorien."""

    # Realisierte Erzeugung
    BIOMASSE                   = 1223
    WASSERKRAFT                = 4066
    WIND_ONSHORE               = 1225
    WIND_OFFSHORE              = 4067
    PHOTOVOLTAIK               = 4068
    SONSTIGE_ERNEUERBARE       = 1228
    BRAUNKOHLE                 = 1224
    STEINKOHLE                 = 1227
    ERDGAS                     = 1226
    PUMPSPEICHER_EINSPEISUNG   = 1229
    SONSTIGE_KONVENTIONELLE    = 1230
    KERNENERGIE                = 1221

    # Erzeugungsprognosen
    PROGNOSE_WIND_ONSHORE      = 3791
    PROGNOSE_WIND_OFFSHORE     = 3792
    PROGNOSE_PHOTOVOLTAIK      = 3793
    PROGNOSE_SONSTIGE          = 6379
    PROGNOSE_GESAMTLAST        = 3794
    PROGNOSE_RESIDUALLAST      = 3795
    PROGNOSE_PUMPSPEICHER      = 3796

    # Realisierter Verbrauch
    GESAMT_VERBRAUCH           = 410
    RESIDUALLAST               = 4359
    PUMPSPEICHER_VERBRAUCH     = 4387

    # Marktpreise (€/MWh)
    DAY_AHEAD_DE_LU            = 4169
    DAY_AHEAD_AT               = 5078
    DAY_AHEAD_BE               = 4996
    DAY_AHEAD_NO2              = 4997
    DAY_AHEAD_NL               = 5313

    # Grenzüberschreitender Handel
    GRENZHANDEL_GESAMT         = 1152
    GRENZHANDEL_DE_AT          = 4081
    GRENZHANDEL_DE_CZ          = 4082
    GRENZHANDEL_DE_DK1         = 4083
    GRENZHANDEL_DE_DK2         = 4084
    GRENZHANDEL_DE_FR          = 4085
    GRENZHANDEL_DE_NL          = 4086
    GRENZHANDEL_DE_NO2         = 4087
    GRENZHANDEL_DE_PL          = 4088
    GRENZHANDEL_DE_SE4         = 4089
    GRENZHANDEL_DE_CH          = 4090

    @property
    def label(self) -> str:
        return _FILTER_LABELS.get(self, self.name)

    @property
    def unit(self) -> str:
        return _FILTER_UNITS.get(self, "MWh")

    @property
    def group(self) -> str:
        return _FILTER_GROUPS.get(self, "Sonstige")

Region

Bases: StrEnum

Verfügbare Regionen / Regelzonen.

Source code in datadancer/sources/smard/constants.py
155
156
157
158
159
160
161
162
163
164
165
class Region(StrEnum):
    """Verfügbare Regionen / Regelzonen."""
    DE          = "DE"
    AT          = "AT"
    LU          = "LU"
    DE_LU       = "DE-LU"
    DE_AT_LU    = "DE-AT-LU"
    FIFTYHERTZ  = "50Hertz"
    AMPRION     = "Amprion"
    TENNET      = "TenneT"
    TRANSNETBW  = "TransnetBW"

Resolution

Bases: StrEnum

Zeitliche Auflösung der Zeitreihe.

Source code in datadancer/sources/smard/constants.py
168
169
170
171
172
173
174
175
class Resolution(StrEnum):
    """Zeitliche Auflösung der Zeitreihe."""
    QUARTERHOUR = "quarterhour"
    HOUR        = "hour"
    DAY         = "day"
    WEEK        = "week"
    MONTH       = "month"
    YEAR        = "year"

SmardClient

Async Client für die SMARD Strommarktdaten-API.

Verwendung als Kontextmanager (empfohlen): async with SmardClient() as client: df = await client.fetch_latest(Filter.WIND_ONSHORE)

Parameters:

Name Type Description Default
region Region

Standard-Region (default: DE)

DE
resolution Resolution

Standard-Auflösung (default: HOUR)

HOUR
tz str

Zeitzone für den DataFrame-Index (default: Europe/Berlin)

'Europe/Berlin'
retries int

Wiederholungsversuche bei HTTP-Fehlern

3
Source code in datadancer/sources/smard/client.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class SmardClient:
    """
    Async Client für die SMARD Strommarktdaten-API.

    Verwendung als Kontextmanager (empfohlen):
        async with SmardClient() as client:
            df = await client.fetch_latest(Filter.WIND_ONSHORE)

    Args:
        region:     Standard-Region (default: DE)
        resolution: Standard-Auflösung (default: HOUR)
        tz:         Zeitzone für den DataFrame-Index (default: Europe/Berlin)
        retries:    Wiederholungsversuche bei HTTP-Fehlern
    """

    def __init__(
        self,
        region: Region = Region.DE,
        resolution: Resolution = Resolution.HOUR,
        tz: str = "Europe/Berlin",
        retries: int = 3,
    ) -> None:
        self.default_region = region
        self.default_resolution = resolution
        self.tz = tz
        self._retries = retries
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self) -> "SmardClient":
        self._client = httpx.AsyncClient(headers=_HEADERS, timeout=_TIMEOUT)
        return self

    async def __aexit__(self, *_) -> None:
        await self.close()

    async def close(self) -> None:
        if self._client:
            await self._client.aclose()
            self._client = None

    @property
    def _http(self) -> httpx.AsyncClient:
        if self._client is None:
            self._client = httpx.AsyncClient(headers=_HEADERS, timeout=_TIMEOUT)
        return self._client

    # ── Interne API-Calls ─────────────────────────────────────────────────────

    async def _get_json(self, url: str) -> dict:
        last_exc: Exception | None = None
        for attempt in range(self._retries):
            try:
                r = await self._http.get(url)
                r.raise_for_status()
                return r.json()
            except (httpx.HTTPError, httpx.TimeoutException) as e:
                last_exc = e
                if attempt < self._retries - 1:
                    await asyncio.sleep(2 ** attempt)
        raise RuntimeError(f"SMARD API nicht erreichbar: {last_exc}")

    async def _fetch_timestamps(self, filter_id: int, region: str, resolution: str) -> list[int]:
        url = f"{_BASE_URL}/{filter_id}/{region}/index_{resolution}.json"
        return (await self._get_json(url)).get("timestamps", [])

    async def _fetch_series(self, filter_id: int, region: str, resolution: str, timestamp: int) -> list:
        url = f"{_BASE_URL}/{filter_id}/{region}/{filter_id}_{region}_{resolution}_{timestamp}.json"
        return (await self._get_json(url)).get("series", [])

    # ── Öffentliche API ───────────────────────────────────────────────────────

    async def fetch(
        self,
        filter: Filter,
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
        start: datetime | None = None,
        end: datetime | None = None,
        dropna: bool = True,
    ) -> pd.DataFrame:
        """
        Zeitreihe für einen SMARD-Filter als DataFrame.

        Returns:
            DataFrame mit DatetimeIndex (Europe/Berlin) und Spalten:
            value, unit, filter, label, region
        """
        rgn = str(region or self.default_region)
        res = str(resolution or self.default_resolution)

        timestamps = await self._fetch_timestamps(filter, rgn, res)
        if not timestamps:
            return self._empty_df()

        ts_start = int(start.timestamp() * 1000) if start else 0
        ts_end   = int(end.timestamp() * 1000) if end else float("inf")
        relevant = [t for t in timestamps if ts_start <= t <= ts_end]
        if not relevant:
            return self._empty_df()

        chunks = await asyncio.gather(
            *[self._fetch_series(filter, rgn, res, ts) for ts in relevant]
        )
        raw = {ts: val for chunk in chunks for ts, val in chunk}
        return self._build_df(raw, filter, rgn, dropna)

    async def fetch_latest(
        self,
        filter: Filter,
        *,
        hours: int = 48,
        region: Region | None = None,
        resolution: Resolution | None = None,
    ) -> pd.DataFrame:
      """Letzten N Stunden ab dem letzten verfügbaren Datenpunkt."""
      rgn = str(region or self.default_region)
      res = str(resolution or self.default_resolution)

      timestamps = await self._fetch_timestamps(filter, rgn, res)
      if not timestamps:
          return self._empty_df()

      # Letzten Block ohne Zeitfilter laden
      last_ts = timestamps[-1]
      series = await self._fetch_series(filter, rgn, res, last_ts)

      # Rohdaten in DataFrame umwandeln
      raw = {ts: val for ts, val in series}
      df = self._build_df(raw, filter, rgn, dropna=True)

      if df.empty:
          return df

      # Auf gewünschte Stunden beschneiden
      cutoff = df.index[-1] - timedelta(hours=hours)
      return df[df.index >= cutoff]

    async def fetch_multi(
        self,
        filters: Sequence[Filter],
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
        start: datetime | None = None,
        end: datetime | None = None,
        hours: int | None = None,
        dropna: bool = True,
        wide: bool = True,
    ) -> pd.DataFrame:
      """Mehrere Filter parallel abrufen."""
      if hours is not None:
          # fetch_latest pro Filter — lädt letzten Block und beschneidet auf hours
          dfs = await asyncio.gather(*[
              self.fetch_latest(f, hours=hours, region=region, resolution=resolution)
              for f in filters
          ])
      else:
          dfs = await asyncio.gather(*[
              self.fetch(f, region=region, resolution=resolution,
                         start=start, end=end, dropna=dropna)
              for f in filters
          ])

      if wide:
          combined = pd.concat(
              [df["value"].rename(f.name) for f, df in zip(filters, dfs) if not df.empty],
              axis=1,
          )
          combined.index.name = "timestamp"
          return combined
      return pd.concat([df for df in dfs if not df.empty], ignore_index=False)


    async def fetch_generation_mix(
        self,
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
        hours: int = 48,
    ) -> pd.DataFrame:
        """Vollständiger Erzeugungsmix als Wide-DataFrame (lesbare Spaltennamen)."""
        filters = [
            Filter.BIOMASSE, Filter.WASSERKRAFT, Filter.WIND_ONSHORE,
            Filter.WIND_OFFSHORE, Filter.PHOTOVOLTAIK, Filter.SONSTIGE_ERNEUERBARE,
            Filter.BRAUNKOHLE, Filter.STEINKOHLE, Filter.ERDGAS,
            Filter.KERNENERGIE,
        ]
        df = await self.fetch_multi(filters, region=region, resolution=resolution,
                                    hours=hours, wide=True)
        df.columns = [Filter[c].label if c in Filter.__members__ else c for c in df.columns]
        return df

    async def fetch_renewable_share(
        self,
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
        hours: int = 48,
    ) -> pd.DataFrame:
        """
        Erneuerbaren-Anteil an der Gesamterzeugung.

        Returns:
            DataFrame mit: renewable_mwh, total_mwh, renewable_pct
        """
        renewable    = [Filter.BIOMASSE, Filter.WASSERKRAFT, Filter.WIND_ONSHORE,
                        Filter.WIND_OFFSHORE, Filter.PHOTOVOLTAIK, Filter.SONSTIGE_ERNEUERBARE]
        conventional = [
              Filter.BRAUNKOHLE, Filter.STEINKOHLE, Filter.ERDGAS, Filter.KERNENERGIE
          ]

        re_df, conv_df = await asyncio.gather(
            self.fetch_multi(renewable,    region=region, resolution=resolution, hours=hours, wide=True),
            self.fetch_multi(conventional, region=region, resolution=resolution, hours=hours, wide=True),
        )
        result = pd.DataFrame(index=re_df.index.union(conv_df.index))
        result["renewable_mwh"] = re_df.sum(axis=1)
        result["total_mwh"]     = re_df.sum(axis=1) + conv_df.sum(axis=1)
        result["renewable_pct"] = (result["renewable_mwh"] / result["total_mwh"] * 100).round(1)
        return result.dropna()

    async def fetch_price_and_load(
        self,
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
        hours: int = 48,
    ) -> pd.DataFrame:
        """Day-Ahead-Preis und Gesamtlast kombiniert."""
        df = await self.fetch_multi(
            [Filter.DAY_AHEAD_DE_LU, Filter.GESAMT_VERBRAUCH],
            region=region, resolution=resolution, hours=hours, wide=True,
        )
        df.columns = ["price_eur_mwh", "load_mwh"]
        return df

    async def list_available_timestamps(
        self,
        filter: Filter,
        *,
        region: Region | None = None,
        resolution: Resolution | None = None,
    ) -> pd.DatetimeIndex:
        """Alle verfügbaren Zeitstempel als DatetimeIndex."""
        rgn = str(region or self.default_region)
        res = str(resolution or self.default_resolution)
        timestamps = await self._fetch_timestamps(filter, rgn, res)
        return pd.to_datetime(timestamps, unit="ms", utc=True).tz_convert(self.tz)

    # ── Hilfsmethoden ─────────────────────────────────────────────────────────

    def _build_df(self, raw: dict, filter: Filter, region: str, dropna: bool) -> pd.DataFrame:
        df = pd.DataFrame(sorted(raw.items()), columns=["timestamp_ms", "value"])
        df["timestamp"] = (pd.to_datetime(df["timestamp_ms"], unit="ms", utc=True)
                           .dt.tz_convert(self.tz))
        df = df.set_index("timestamp").drop(columns=["timestamp_ms"])
        df["unit"]   = filter.unit
        df["filter"] = filter.name
        df["label"]  = filter.label
        df["region"] = region
        return df.dropna(subset=["value"]) if dropna else df

    @staticmethod
    def _empty_df() -> pd.DataFrame:
        return pd.DataFrame(columns=["value", "unit", "filter", "label", "region"])

fetch(filter, *, region=None, resolution=None, start=None, end=None, dropna=True) async

Zeitreihe für einen SMARD-Filter als DataFrame.

Returns:

Type Description
DataFrame

DataFrame mit DatetimeIndex (Europe/Berlin) und Spalten:

DataFrame

value, unit, filter, label, region

Source code in datadancer/sources/smard/client.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def fetch(
    self,
    filter: Filter,
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    dropna: bool = True,
) -> pd.DataFrame:
    """
    Zeitreihe für einen SMARD-Filter als DataFrame.

    Returns:
        DataFrame mit DatetimeIndex (Europe/Berlin) und Spalten:
        value, unit, filter, label, region
    """
    rgn = str(region or self.default_region)
    res = str(resolution or self.default_resolution)

    timestamps = await self._fetch_timestamps(filter, rgn, res)
    if not timestamps:
        return self._empty_df()

    ts_start = int(start.timestamp() * 1000) if start else 0
    ts_end   = int(end.timestamp() * 1000) if end else float("inf")
    relevant = [t for t in timestamps if ts_start <= t <= ts_end]
    if not relevant:
        return self._empty_df()

    chunks = await asyncio.gather(
        *[self._fetch_series(filter, rgn, res, ts) for ts in relevant]
    )
    raw = {ts: val for chunk in chunks for ts, val in chunk}
    return self._build_df(raw, filter, rgn, dropna)

fetch_generation_mix(*, region=None, resolution=None, hours=48) async

Vollständiger Erzeugungsmix als Wide-DataFrame (lesbare Spaltennamen).

Source code in datadancer/sources/smard/client.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def fetch_generation_mix(
    self,
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
    hours: int = 48,
) -> pd.DataFrame:
    """Vollständiger Erzeugungsmix als Wide-DataFrame (lesbare Spaltennamen)."""
    filters = [
        Filter.BIOMASSE, Filter.WASSERKRAFT, Filter.WIND_ONSHORE,
        Filter.WIND_OFFSHORE, Filter.PHOTOVOLTAIK, Filter.SONSTIGE_ERNEUERBARE,
        Filter.BRAUNKOHLE, Filter.STEINKOHLE, Filter.ERDGAS,
        Filter.KERNENERGIE,
    ]
    df = await self.fetch_multi(filters, region=region, resolution=resolution,
                                hours=hours, wide=True)
    df.columns = [Filter[c].label if c in Filter.__members__ else c for c in df.columns]
    return df

fetch_latest(filter, *, hours=48, region=None, resolution=None) async

Letzten N Stunden ab dem letzten verfügbaren Datenpunkt.

Source code in datadancer/sources/smard/client.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def fetch_latest(
    self,
    filter: Filter,
    *,
    hours: int = 48,
    region: Region | None = None,
    resolution: Resolution | None = None,
) -> pd.DataFrame:
  """Letzten N Stunden ab dem letzten verfügbaren Datenpunkt."""
  rgn = str(region or self.default_region)
  res = str(resolution or self.default_resolution)

  timestamps = await self._fetch_timestamps(filter, rgn, res)
  if not timestamps:
      return self._empty_df()

  # Letzten Block ohne Zeitfilter laden
  last_ts = timestamps[-1]
  series = await self._fetch_series(filter, rgn, res, last_ts)

  # Rohdaten in DataFrame umwandeln
  raw = {ts: val for ts, val in series}
  df = self._build_df(raw, filter, rgn, dropna=True)

  if df.empty:
      return df

  # Auf gewünschte Stunden beschneiden
  cutoff = df.index[-1] - timedelta(hours=hours)
  return df[df.index >= cutoff]

fetch_multi(filters, *, region=None, resolution=None, start=None, end=None, hours=None, dropna=True, wide=True) async

Mehrere Filter parallel abrufen.

Source code in datadancer/sources/smard/client.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def fetch_multi(
    self,
    filters: Sequence[Filter],
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    hours: int | None = None,
    dropna: bool = True,
    wide: bool = True,
) -> pd.DataFrame:
  """Mehrere Filter parallel abrufen."""
  if hours is not None:
      # fetch_latest pro Filter — lädt letzten Block und beschneidet auf hours
      dfs = await asyncio.gather(*[
          self.fetch_latest(f, hours=hours, region=region, resolution=resolution)
          for f in filters
      ])
  else:
      dfs = await asyncio.gather(*[
          self.fetch(f, region=region, resolution=resolution,
                     start=start, end=end, dropna=dropna)
          for f in filters
      ])

  if wide:
      combined = pd.concat(
          [df["value"].rename(f.name) for f, df in zip(filters, dfs) if not df.empty],
          axis=1,
      )
      combined.index.name = "timestamp"
      return combined
  return pd.concat([df for df in dfs if not df.empty], ignore_index=False)

fetch_price_and_load(*, region=None, resolution=None, hours=48) async

Day-Ahead-Preis und Gesamtlast kombiniert.

Source code in datadancer/sources/smard/client.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def fetch_price_and_load(
    self,
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
    hours: int = 48,
) -> pd.DataFrame:
    """Day-Ahead-Preis und Gesamtlast kombiniert."""
    df = await self.fetch_multi(
        [Filter.DAY_AHEAD_DE_LU, Filter.GESAMT_VERBRAUCH],
        region=region, resolution=resolution, hours=hours, wide=True,
    )
    df.columns = ["price_eur_mwh", "load_mwh"]
    return df

fetch_renewable_share(*, region=None, resolution=None, hours=48) async

Erneuerbaren-Anteil an der Gesamterzeugung.

Returns:

Type Description
DataFrame

DataFrame mit: renewable_mwh, total_mwh, renewable_pct

Source code in datadancer/sources/smard/client.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def fetch_renewable_share(
    self,
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
    hours: int = 48,
) -> pd.DataFrame:
    """
    Erneuerbaren-Anteil an der Gesamterzeugung.

    Returns:
        DataFrame mit: renewable_mwh, total_mwh, renewable_pct
    """
    renewable    = [Filter.BIOMASSE, Filter.WASSERKRAFT, Filter.WIND_ONSHORE,
                    Filter.WIND_OFFSHORE, Filter.PHOTOVOLTAIK, Filter.SONSTIGE_ERNEUERBARE]
    conventional = [
          Filter.BRAUNKOHLE, Filter.STEINKOHLE, Filter.ERDGAS, Filter.KERNENERGIE
      ]

    re_df, conv_df = await asyncio.gather(
        self.fetch_multi(renewable,    region=region, resolution=resolution, hours=hours, wide=True),
        self.fetch_multi(conventional, region=region, resolution=resolution, hours=hours, wide=True),
    )
    result = pd.DataFrame(index=re_df.index.union(conv_df.index))
    result["renewable_mwh"] = re_df.sum(axis=1)
    result["total_mwh"]     = re_df.sum(axis=1) + conv_df.sum(axis=1)
    result["renewable_pct"] = (result["renewable_mwh"] / result["total_mwh"] * 100).round(1)
    return result.dropna()

list_available_timestamps(filter, *, region=None, resolution=None) async

Alle verfügbaren Zeitstempel als DatetimeIndex.

Source code in datadancer/sources/smard/client.py
272
273
274
275
276
277
278
279
280
281
282
283
async def list_available_timestamps(
    self,
    filter: Filter,
    *,
    region: Region | None = None,
    resolution: Resolution | None = None,
) -> pd.DatetimeIndex:
    """Alle verfügbaren Zeitstempel als DatetimeIndex."""
    rgn = str(region or self.default_region)
    res = str(resolution or self.default_resolution)
    timestamps = await self._fetch_timestamps(filter, rgn, res)
    return pd.to_datetime(timestamps, unit="ms", utc=True).tz_convert(self.tz)