-
Notifications
You must be signed in to change notification settings - Fork 18
Closed
Description
TLDR: I only get 2 of 26 partitions written, unless I specify a region in which case I get all of them.
I have a conda env with coiled-runtime=0.0.4 and an editable install of coiled:
conda create -n coiled-runtime-004 -c coiled -c conda-forge python=3.9 coiled-runtime=0.0.4 -y
conda activate coiled-runtime-004
pip install -e python-api
AWS_PROFILE=test-default ipythonthen I make a cluster and try to write some parquet:
import coiled
cluster = coiled.Cluster(n_workers=2, software='coiled/coiled-runtime-0-0-4-py39')
import dask
from dask.distributed import Client
client = Client(cluster)
df = dask.datasets.timeseries("2000", "2001", partition_freq="2w")
df.to_parquet("s3://coiled-nat-test-account-0/out12",
# storage_options={"client_kwargs": {"region_name": "us-west-1"}}
)traceback:
In [7]: df = dask.datasets.timeseries("2000", "2001", partition_freq="2w")
...: df.to_parquet("s3://coiled-nat-test-account-0/out12",
...: # storage_options={"client_kwargs": {"region_name": "us-west-1"}}
...: )
---------------------------------------------------------------------------
ClientError Traceback (most recent call last)
File /opt/conda/lib/python3.9/site-packages/s3fs/core.py:110, in _error_wrapper()
File /opt/conda/lib/python3.9/site-packages/aiobotocore/client.py:265, in _make_api_call()
ClientError: An error occurred (AccessDenied) when calling the CreateMultipartUpload operation: No AWSAccessKey was presented.
The above exception was the direct cause of the following exception:
PermissionError Traceback (most recent call last)
Input In [7], in <cell line: 2>()
1 df = dask.datasets.timeseries("2000", "2001", partition_freq="2w")
----> 2 df.to_parquet("s3://coiled-nat-test-account-0/out12",
3 # storage_options={"client_kwargs": {"region_name": "us-west-1"}}
4 )
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/dask/dataframe/core.py:5045, in DataFrame.to_parquet(self, path, *args, **kwargs)
5042 """See dd.to_parquet docstring for more information"""
5043 from dask.dataframe.io import to_parquet
-> 5045 return to_parquet(self, path, *args, **kwargs)
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py:930, in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, name_function, **kwargs)
927 out = Scalar(graph, final_name, "")
929 if compute:
--> 930 out = out.compute(**compute_kwargs)
932 # Invalidate the filesystem listing cache for the output path after write.
933 # We do this before returning, even if `compute=False`. This helps ensure
934 # that reading files that were just written succeeds.
935 fs.invalidate_cache(path)
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/dask/base.py:312, in DaskMethodsMixin.compute(self, **kwargs)
288 def compute(self, **kwargs):
289 """Compute this dask collection
290
291 This turns a lazy Dask collection into its in-memory equivalent.
(...)
310 dask.base.compute
311 """
--> 312 (result,) = compute(self, traverse=False, **kwargs)
313 return result
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597 keys.append(x.__dask_keys__())
598 postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/client.py:3000, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2998 should_rejoin = False
2999 try:
-> 3000 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3001 finally:
3002 for f in futures.values():
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/client.py:2174, in Client.gather(self, futures, errors, direct, asynchronous)
2172 else:
2173 local_worker = None
-> 2174 return self.sync(
2175 self._gather,
2176 futures,
2177 errors=errors,
2178 direct=direct,
2179 local_worker=local_worker,
2180 asynchronous=asynchronous,
2181 )
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/utils.py:320, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
318 return future
319 else:
--> 320 return sync(
321 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
322 )
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/utils.py:387, in sync(loop, func, callback_timeout, *args, **kwargs)
385 if error:
386 typ, exc, tb = error
--> 387 raise exc.with_traceback(tb)
388 else:
389 return result
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/utils.py:360, in sync.<locals>.f()
358 future = asyncio.wait_for(future, callback_timeout)
359 future = asyncio.ensure_future(future)
--> 360 result = yield future
361 except Exception:
362 error = sys.exc_info()
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/client.py:2037, in Client._gather(self, futures, errors, direct, local_worker)
2035 exc = CancelledError(key)
2036 else:
-> 2037 raise exception.with_traceback(traceback)
2038 raise exc
2039 if errors == "skip":
File /opt/conda/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
File /opt/conda/lib/python3.9/site-packages/dask/core.py:149, in get()
File /opt/conda/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
File /opt/conda/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py:154, in __call__()
File /opt/conda/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py:714, in write_partition()
File /opt/conda/lib/python3.9/site-packages/pyarrow/parquet/__init__.py:2894, in write_table()
File /opt/conda/lib/python3.9/site-packages/pyarrow/parquet/__init__.py:1006, in write_table()
File ~/anaconda3/envs/coiled-runtime-004/lib/python3.9/site-packages/pyarrow/_parquet.pyx:1752, in pyarrow._parquet.ParquetWriter.write_table()
1750
1751 with nogil:
-> 1752 check_status(self.writer.get()
1753 .WriteTable(deref(ctable), c_row_group_size))
1754
File /opt/conda/lib/python3.9/site-packages/fsspec/spec.py:1494, in write()
File /opt/conda/lib/python3.9/site-packages/fsspec/spec.py:1530, in flush()
File /opt/conda/lib/python3.9/site-packages/s3fs/core.py:1970, in _initiate_upload()
File /opt/conda/lib/python3.9/site-packages/s3fs/core.py:1962, in _call_s3()
File /opt/conda/lib/python3.9/site-packages/fsspec/asyn.py:86, in wrapper()
File /opt/conda/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync()
File /opt/conda/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner()
File /opt/conda/lib/python3.9/site-packages/s3fs/core.py:332, in _call_s3()
File /opt/conda/lib/python3.9/site-packages/s3fs/core.py:137, in _error_wrapper()
PermissionError: No AWSAccessKey was presented.There is an access key, and in fact two files do get written:
Finally, if I specify the region then I get all the files:
df.to_parquet("s3://coiled-nat-test-account-0/out13",
storage_options={"client_kwargs": {"region_name": "us-west-1"}}
)This behavior is consistent. I try without region, I get 2 files; I try with region, I get all of them.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels

