I am a new user of Dask and RapidsAI.
An exerpt of my data (in csv
format):
Symbol,Date,Open,High,Low,Close,Volume
AADR,17-Oct-2017 09:00,57.47,58.3844,57.3645,58.3844,2094
AADR,17-Oct-2017 10:00,57.27,57.2856,57.25,57.27,627
AADR,17-Oct-2017 11:00,56.99,56.99,56.99,56.99,100
AADR,17-Oct-2017 12:00,56.98,57.05,56.98,57.05,200
AADR,17-Oct-2017 13:00,57.14,57.16,57.14,57.16,700
AADR,17-Oct-2017 14:00,57.13,57.13,57.13,57.13,100
AADR,17-Oct-2017 15:00,57.07,57.07,57.07,57.07,200
AAMC,17-Oct-2017 09:00,87,87,87,87,100
AAU,17-Oct-2017 09:00,1.1,1.13,1.0832,1.121,67790
AAU,17-Oct-2017 10:00,1.12,1.12,1.12,1.12,100
AAU,17-Oct-2017 11:00,1.125,1.125,1.125,1.125,200
AAU,17-Oct-2017 12:00,1.1332,1.15,1.1332,1.15,27439
AAU,17-Oct-2017 13:00,1.15,1.15,1.13,1.13,8200
AAU,17-Oct-2017 14:00,1.1467,1.1467,1.14,1.1467,1750
AAU,17-Oct-2017 15:00,1.1401,1.1493,1.1401,1.1493,4100
AAU,17-Oct-2017 16:00,1.13,1.13,1.13,1.13,100
ABE,17-Oct-2017 09:00,14.64,14.64,14.64,14.64,200
ABE,17-Oct-2017 10:00,14.67,14.67,14.66,14.66,1200
ABE,17-Oct-2017 11:00,14.65,14.65,14.65,14.65,600
ABE,17-Oct-2017 15:00,14.65,14.65,14.65,14.65,836
Note Date
column is of type string.
I have some example stock market timeseries data (i.e., DOHLCV) in csv files and I read them into a dask_cudf
dataframe (my dask.dataframe
backend is cudf and read.csv
is a creation dispacther that conveniently gives me a cudf.dataframe
).
import dask_cudf
import cudf
from dask import dataframe as dd
ddf = dd.read_csv('path/to/my/data/*.csv')
ddf
# output
<dask_cudf.DataFrame | 450 tasks | 450 npartitions>
# test csv data above can be retrieved using following statements
# df = pd.read_clipboard(sep=",")
# cdf = cudf.from_pandas(df)
# ddf = dask_cudf.from_cudf(cdf, npartitions=2)
I then try to convert datetime string into real datetime object (np.datetime64[ns]
or anything equivalent in cudf
/dask
world). I then failed with error.
df["Date"] = dd.to_datetime(df["Date"], format="%d-%b-%Y %H:%M").head(5)
df.set_index("Date", inplace=True) # This failed with different error, will raise in a different SO thread.
# Following statement gives me same error.
# cudf.to_datetime(df["Date"], format="%d-%b-%Y %H:%M")
Full error log is to the end.
The error message seems to suggest that I’d need to compute
the dask_cudf.dataframe
, turning it into a real cudf
object, then I
can do as I would in pandas
:
df["Date"] = cudf.to_datetime(df.Date)
df = df.set_index(df.Date)
This apparently isn’t ideal and it very much is the thing that dask
is for: we’d delay this and only calculate the ultimate number we need.
what is the dask
/dask_cudf
way to convert a string column to datetime column in dask_cudf
? As far as I can see, if the backend is pandas
, the conversion is done smoothly and rarely has problem.
Or, is it that cudf
or GPU world in general, is not supposed to do much with date types like datetime
, string
? (e.g., ideally GPU is geared towards expensive numerical computations).
My use case involves some filtering to do with string
and datetime
, therefore I need to set up the dataframe
with proper datetime
object.
Error Log
TypeError Traceback (most recent call last)
Cell In[52], line 1
----> 1 dd.to_datetime(df["Date"], format="%d-%b-%Y %H:%M").head(2)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:1268, in _Frame.head(self, n, npartitions, compute)
1266 # No need to warn if we're already looking at all partitions
1267 safe = npartitions != self.npartitions
-> 1268 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:1302, in _Frame._head(self, n, npartitions, compute, safe)
1297 result = new_dd_object(
1298 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1299 )
1301 if compute:
-> 1302 result = result.compute()
1303 return result
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.__dask_keys__())
597 postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/utils.py:72, in apply(func, args, kwargs)
41 """Apply a function given its positional and keyword arguments.
42
43 Equivalent to ``func(*args, **kwargs)``
(...)
69 >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
70 """
71 if kwargs:
---> 72 return func(*args, **kwargs)
73 else:
74 return func(*args)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:6821, in apply_and_enforce(*args, **kwargs)
6819 func = kwargs.pop("_func")
6820 meta = kwargs.pop("_meta")
-> 6821 df = func(*args, **kwargs)
6822 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
6823 if not len(df):
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/pandas/core/tools/datetimes.py:1100, in to_datetime(arg, errors, dayfirst, yearfirst, utc, format, exact, unit, infer_datetime_format, origin, cache)
1098 result = _convert_and_box_cache(argc, cache_array)
1099 else:
-> 1100 result = convert_listlike(argc, format)
1101 else:
1102 result = convert_listlike(np.array([arg]), format)[0]
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/pandas/core/tools/datetimes.py:413, in _convert_listlike_datetimes(arg, format, name, tz, unit, errors, infer_datetime_format, dayfirst, yearfirst, exact)
410 return idx
411 raise
--> 413 arg = ensure_object(arg)
414 require_iso8601 = False
416 if infer_datetime_format and format is None:
File pandas/_libs/algos_common_helper.pxi:33, in pandas._libs.algos.ensure_object()
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/cudf/core/frame.py:451, in Frame.__array__(self, dtype)
450 def __array__(self, dtype=None):
--> 451 raise TypeError(
452 "Implicit conversion to a host NumPy array via __array__ is not "
453 "allowed, To explicitly construct a GPU matrix, consider using "
454 ".to_cupy()\nTo explicitly construct a host matrix, consider "
455 "using .to_numpy()."
456 )
TypeError: Implicit conversion to a host NumPy array via __array__ is not allowed, To explicitly construct a GPU matrix, consider using .to_cupy()
To explicitly construct a host matrix, consider using .to_numpy().