Skip to content

Create a scheduler pod when DaskCluster resource is created#397

Merged
jacobtomlinson merged 9 commits intodask:dask-operatorfrom
Matt711:dask-operator
Feb 8, 2022
Merged

Create a scheduler pod when DaskCluster resource is created#397
jacobtomlinson merged 9 commits intodask:dask-operatorfrom
Matt711:dask-operator

Conversation

@Matt711
Copy link
Copy Markdown
Member

@Matt711 Matt711 commented Feb 2, 2022

As a part of #392, the Dask Operator needs to create/update/delete pods and things on Kubernetes whenever we create our custom resources. This PR is concerned with creating and deleting the scheduler pod when we create or delete the DaskCluster resource, respectively.

@Matt711
Copy link
Copy Markdown
Member Author

Matt711 commented Feb 2, 2022

Start the operator with kopf run dask_kubernetes/operator/daskcluster.py and create the DaskCluster resource with kubectl apply -f dask_kubernetes/operator/tests/resources/simplecluster.yaml

/home/nfs/mmurray/anaconda3/lib/python3.9/site-packages/kopf/_core/reactor/running.py:176: FutureWarning: Absence of either namespaces or cluster-wide flag will become an error soon. For now, switching to the cluster-wide mode for backward compatibility.
  warnings.warn("Absence of either namespaces or cluster-wide flag will become an error soon."
[2022-02-02 10:34:05,852] kopf._core.engines.a [INFO    ] Initial authentication has been initiated.
[2022-02-02 10:34:05,868] kopf.activities.auth [INFO    ] Activity 'login_via_pykube' succeeded.
[2022-02-02 10:34:05,883] kopf.activities.auth [INFO    ] Activity 'login_via_client' succeeded.
[2022-02-02 10:34:05,883] kopf._core.engines.a [INFO    ] Initial authentication has finished.
[2022-02-02 10:34:17,641] kopf.objects         [INFO    ] [default/simple-cluster] A DaskCluster has been created called simple-cluster in default with the following config: {'image': 'daskdev/dask:latest', 'imagePullPolicy': 'IfNotPresent', 'imagePullSecrets': None, 'protocol': 'tcp', 'scheduler': {'env': {}, 'resources': {}, 'serviceType': 'ClusterIP'}}
[2022-02-02 10:34:17,661] kopf.objects         [INFO    ] [default/simple-cluster] Scheduler pod  in default is created
[2022-02-02 10:34:17,680] kopf.objects         [INFO    ] [default/simple-cluster] Scheduler service in default is created
[2022-02-02 10:34:17,680] kopf.objects         [INFO    ] [default/simple-cluster] Handler 'daskcluster_create' succeeded.
[2022-02-02 10:34:17,680] kopf.objects         [INFO    ] [default/simple-cluster] Creation is processed: 1 succeeded; 0 failed.
^C[2022-02-02 10:37:39,942] kopf._core.reactor.r [INFO    ] Signal SIGINT is received. Operator is stopping.

Delete the DaskCluster resource with kubectl delete daskcluster my-cluster which will also delete the scheduler pod and service

@jacobtomlinson
Copy link
Copy Markdown
Member

This is great! I ran your instructions locally and saw that when creating the DaskCluster resource it created a scheduler Pod.

Now we need to translate those instructions into a test. If we think about those instructions from the perspective of starting and finishing at a clean and default Kubernetes cluster they would be:

  • Create the CRDs
  • Start the operator
  • Create the example DaskCluster resource
  • Look at the operator logs to make sure the Scheduler pod in default is created happened
    • We might also want to do kubectl get pods and kubectl get svc to make sure our scheduler exists
    • Also we could do kubectl logs <scheduler pod ID> to make sure the scheduler is happy
  • Delete the cluster resource
    • Check that the scheduler pod and service was also deleted
  • Stop the operator
  • Delete the CRDs

If you look at the first and last steps in that list they are a mirror. Create the CRDs first, delete the CRDs last. We can do this with a Pytest fixture. The test will run when the fixture yields, so it's a great way to set up and tear down.

@pytest.fixture
def customresources():
    # Create CRDs
    yield
    # Delete CRDs

In the test prep PR that I did, I've already created fixtures for the CRDs, the operator and the example cluster resource.

I also added a test that uses those fixtures, so when we run it all the test does is assert simplecluster.

@pytest.mark.timeout(60)
@pytest.mark.asyncio
async def test_simplecluster(simplecluster):
# If we get to this point then all fixtures worked ok and we can actually test some things
assert simplecluster

A good first step would be to extend that test to check that the scheduler pod and service exists.

Comment on lines +36 to +31
# Check that the schedeuler pod and service exist
scheduler_pod_name = "simple-cluster-scheduler"
scheduler_service_name = "simple-cluster"

assert scheduler_pod_name in k8s_cluster.kubectl("get", "pods")
assert scheduler_service_name in k8s_cluster.kubectl("get", "services")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in test_simplecluster, not the fixture.

Also it is failing because there isn't enough time for the pod to be created before we do get pods here. You could turn each of these into a while.

while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"):
    await asyncio.sleep(0.1)

If the pod never gets created the test will timeout at 60 seconds and fail.

@jacobtomlinson
Copy link
Copy Markdown
Member

Sorry I accidentally pushed a commit here instead of my own branch, just force pushed to remove it again.

@jacobtomlinson jacobtomlinson marked this pull request as ready for review February 8, 2022 14:15
Copy link
Copy Markdown
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good. We've got a nice structure forming here that we can expand on in future PRs.

@jacobtomlinson jacobtomlinson merged commit 208d57f into dask:dask-operator Feb 8, 2022
jacobtomlinson added a commit that referenced this pull request Apr 26, 2022
* Initial test file (#391)

* Add daskcluster custom resource (#393)

* Initial test file

* Add daskcluster custom resource

* Add Dask Worker Group CRD (#394)

* Add Dask Worker Group CRD

* Add image and replica fields to spec

* Finish DaskWorkerGroup Template

* Update test_customresourcecs

* Normalize line endings to LF

* Update files for LF line endings

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Add operator test (#395)

* Add minimal operator code with tests

* Move operator runner into fixture

* Actually run operator and move to a fixture

* Add workergroup test

* Refactor fixtures (#400)

* Create a scheduler pod when DaskCluster resource is created (#397)

* Create a scheduler pod when DaskCluster resource is created

* Upadate DaskCluster example simple-cluster.yaml

* Add tests for creating scheduler pod and service

* Revert "Add tests for creating scheduler pod and service"

This reverts commit bf58f6a.

* Rebase fix merge conflicts

* Check that scheduler pod and service are created

* Fix Dask cluster tests

* Uncomment test

* Kopf is struggling to authenticate in CI, being explicit with config

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>
Co-authored-by: Jacob Tomlinson <jtomlinson@nvidia.com>

* Create workers with the Dask Operator (#403)

* Create a scheduler pod when DaskCluster resource is created

* Create worker group when DaskWorkerGroup resource is created

* Create default worker group when DaskCluster resource is created

* Update the DaskWorkerGroup example

* Add test for adding workers

* Add Dask example to operator tests

* Fix dask example in test

* Add timeout before connecting to client in dask cluster test

* Add checks for dask cluster pods

* Wait for the scheduler pod to be created

* Check if the scheduler has started

* Only run test_simplecluster

* Only run test_simplecluster

* Add checks for daskcluster pods

* Remove check scheduler started

* Add timeouts for scheduler to get started

* Add all tests back

* Remove first delay from daskcluster test

* Remove second delay from daskcluster test

* Add localhost port to kubectl port-forward

* Change endpoint address for daskcluster test

* Add aysncio.sleep before running dask example

* Add second aysncio.sleep before running dask example

* Add timeout decorator to simplecluster test

* Increased timeout on simplecluster test

* Remove timeouts in test_simplecluster

* Delete timeout and wait for scheduler in test_simplecluster

* Decrease timneouts

* Increase timeout

* Add the second timer

* Change client endpoint connection

* Remove the first timeout

* Decrease timeout

* Decrease timeout

* Decrease timeout

* Wait for scheduler pod to be Running

* Ditch a flaky check

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>
Co-authored-by: Jacob Tomlinson <jtomlinson@nvidia.com>

* Add Scaling to the Dask Operator (#406)

* Create default worker group when DaskCluster resource is created

* Update the DaskWorkerGroup example

* Add test for adding workers

* Add checks for dask cluster pods

* Wait for the scheduler pod to be created

* Only run test_simplecluster

* Remove check scheduler started

* Add timeouts for scheduler to get started

* Add all tests back

* Remove second delay from daskcluster test

* Change endpoint address for daskcluster test

* Add timeout decorator to simplecluster test

* Increased timeout on simplecluster test

* Add scaling to Dask Operator

* Remove changes from test_operator

* Refactor to make use of kopf.on module in Operator

* Remove 'workers' key from custom resources

* Fix name of worker pod in operator test

* Scale cluster in test_operator

* Remove incorrect workers key from dict

* Add timeout back to test_simplecluster

* Scale dask cluster in test_operator

* Wait for the new workers

* Change syntax of kubectl scale

* Comment out scaling in test

* Add scaling up back to test_simplecluster

* Add second scaling to test_simplecluster

* Add timeout decorator for test_simplecluster

* Decrease timeout for test_simplecluster

* Create separate test for scaling

* Wait for the scheduler

* Wait for the scheduler

* Wait for the scheduler

* Rewrite scaling cluster test

* Remove timeout from scaling test

* Add sleep to scaling test

* Rewrite scaling cluster test

* Fix scaling test

* Comment out scaling test

* Connect client to simple-cluster-scheduler

* Add async arg to client

* Remove scheduler name from Client

* Add kop_runner to scaling test

* Build up Dask cluster before scaling

* Wait for service to become ready

* Delete workergroups when cluster is deleted

* Wait for cluster to be deleted

* Wait for cluster to be deleted

* Comment out scaling test

* Wait for cluster to be deleted

* Test only scaling

* Test only scaling

* Run all tests

* Test that cluster has been cleaned up

* Test that cluster has been cleaned up

* Only run the cluster and scaling tests

* Only test cluster and scaling

* Clean up cluster

* Wait for cluster to be ready

* Clean up cluster

* Test scale first

* Ensure cluster gets deleted

* Ensure cluster gets deleted

* Test create cluster first

* Test scale cluster first

* Test create cluster first

* Test scle cluster first

* Wat for scheduler pod

* Wait for scheduler pod

* Clean up code

* Wait for pods to be ready

* Change dask worker names

* Only delete the cluster that test x created

* Remove status fields from crm manifests

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Merge main into operator feature branch (#409)

* Fix Scaling Tests (#410)

* Create a scheduler pod when DaskCluster resource is created

* Add tests for creating scheduler pod and service

* Revert "Add tests for creating scheduler pod and service"

This reverts commit bf58f6a.

* Rebase fix merge conflicts

* Check that scheduler pod and service are created

* Fix Dask cluster tests

* Remove timeout from test_simplecluster

* Add timeout back to test_simplecluster

* Add wait flag when deleteing resources

* Wait for 'No resources...' in logs

* Wait for scheduler to be in Running state

* Clean up comments

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Scale Dask clusters using Scheduler information (#411)

* Create a scheduler pod when DaskCluster resource is created

* Add tests for creating scheduler pod and service

* Revert "Add tests for creating scheduler pod and service"

This reverts commit bf58f6a.

* Rebase fix merge conflicts

* Check that scheduler pod and service are created

* Fix Dask cluster tests

* Connect to scheduler with RPC

* Restart checks

* Comment out rpc

* RPC logic for scaling down workers

* Fix operator test, worker name changed

* Remove pytest timeout decorator from test cluster

* Remove version req on nest-asyncio

* Add version req on nest-asyncio

* Restart github actions

* Add timeout back

* Get rid of nest-asyncio

* Add a TODO for replacing 'localhost' with service address in rpc

* Update TODO rpc address

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Add docker image and manifest for deployment (#415)

* Add docker image and manifest for deployment

* Use higher level module

* Add a cluster manager that supports that Dask Operator (#413)

* Create a scheduler pod when DaskCluster resource is created

* Add tests for creating scheduler pod and service

* Revert "Add tests for creating scheduler pod and service"

This reverts commit bf58f6a.

* Rebase fix merge conflicts

* Check that scheduler pod and service are created

* Fix Dask cluster tests

* Connect to scheduler with RPC

* Restart checks

* Comment out rpc

* RPC logic for scaling down workers

* Fix operator test, worker name changed

* Remove pytest timeout decorator from test cluster

* Remove version req on nest-asyncio

* Add version req on nest-asyncio

* Restart github actions

* Add timeout back

* Get rid of nest-asyncio

* Add a TODO for replacing 'localhost' with service address in rpc

* Update TODO rpc address

* Add a cluster manager tht supports that Dask Operator

* Add some more methods t KubeCluster2

* Add class method to cm for connecting to existing cluster manager

* Add build func for cluster and create daskcluster in KubeCluster2

* Restart checks

* Add cluster auth to KubeCluster2

* Create cluster resource and get pod names with kubectl instead of python client

* Use kubectl in _start

* Add scale and adapt methods

* Connect cluster manager to cluster and add additional worker method

* Add test for KubeCluster2

* Remove rel import from test

* Remove new test

* Restart checks

* Address review commments

* Address comments on temporaryfile and cm docstring

* Delete unused var

* Test check without Operator

* Add operator changes back

* Add cm tests

* remove async from KubeCluster2 instance

* restart checks

* Add asserts to KubeCluster2 tests

* Switch to kubernetes-asyncio

* Simplify operator tests

* Update kopf command in operator tests

* Romve async from  operator test

* Ensure Operator is running for tests

* Rewrite KubeCluster2 test with async cm

* Clean up cluster in tests

* Remove operator tests

* Update oudated class name V1beta1Eviction to V1Eviction

* Add operator test back

* delete test cluster

* Add Client test to operator tests

* Start the operator synchronously

* Revert to op tests without kubecluster2

* Remove scaling from operator tests

* Add delete to KubeCluster2

* Add missing Client import

* Reformat operator code

* Add kubecluster2 tests

* Create and delete cluster with cm

* test_fixtures_kubecluster2 depends on kopf_runner and gen_cluster2

* test needs to be called asynchronously

* Close cm

* gen_cluster2() is a cm

* Close cluster and client in tests

* Patch daskcluster resource before deleting

* Add async to KubeCluster2

* Remove delete handler

* Ensure cluster is scaled down with dask rpc

* Wait for cluster pods to be ready

* Wait for cluster resources after creating them

* Remove async from KubeCluster2

* Patch dask cluster resource

* Fix syntax error in kubectl command

* Explicitly close the client

* Close rpc objects

* Don't delete cluster twice

* Mark test as asyncio

* Remove Client from test

* Patch daskcluster CR before deleting

* Instantiate KubeCluster2 with a cm

* Fix KubeCluster cm impl

* Wait for cluster resources to be deleted

* Split up kubecluster2 tests

* Add test_basic for kubecluster2

* Add test_scale_up_down for KubeCluster2

* Remove test_scale_up_down

* Add test_scale_up_down back

* Clean up code

* Delete scale_cluster_up_and_down test

* Remove test_basic_kubecluster test

* Add TODO for default namespace

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Support HPA style autoscaling (#418)

* Create a scheduler pod when DaskCluster resource is created

* Add tests for creating scheduler pod and service

* Revert "Add tests for creating scheduler pod and service"

This reverts commit bf58f6a.

* Rebase fix merge conflicts

* Check that scheduler pod and service are created

* Fix Dask cluster tests

* Connect to scheduler with RPC

* Restart checks

* Comment out rpc

* RPC logic for scaling down workers

* Fix operator test, worker name changed

* Remove pytest timeout decorator from test cluster

* Remove version req on nest-asyncio

* Add version req on nest-asyncio

* Restart github actions

* Add timeout back

* Get rid of nest-asyncio

* Add a TODO for replacing 'localhost' with service address in rpc

* Update TODO rpc address

* Add a cluster manager tht supports that Dask Operator

* Add some more methods t KubeCluster2

* Add class method to cm for connecting to existing cluster manager

* Add build func for cluster and create daskcluster in KubeCluster2

* Restart checks

* Add cluster auth to KubeCluster2

* Create cluster resource and get pod names with kubectl instead of python client

* Use kubectl in _start

* Add scale and adapt methods

* Connect cluster manager to cluster and add additional worker method

* Add test for KubeCluster2

* Remove rel import from test

* Remove new test

* Restart checks

* Address review commments

* Address comments on temporaryfile and cm docstring

* Delete unused var

* Test check without Operator

* Add operator changes back

* Add cm tests

* remove async from KubeCluster2 instance

* restart checks

* Add asserts to KubeCluster2 tests

* Switch to kubernetes-asyncio

* Simplify operator tests

* Update kopf command in operator tests

* Romve async from  operator test

* Ensure Operator is running for tests

* Rewrite KubeCluster2 test with async cm

* Clean up cluster in tests

* Remove operator tests

* Update oudated class name V1beta1Eviction to V1Eviction

* Add operator test back

* delete test cluster

* Add Client test to operator tests

* Start the operator synchronously

* Revert to op tests without kubecluster2

* Remove scaling from operator tests

* Add delete to KubeCluster2

* Add missing Client import

* Reformat operator code

* Add kubecluster2 tests

* Create and delete cluster with cm

* test_fixtures_kubecluster2 depends on kopf_runner and gen_cluster2

* test needs to be called asynchronously

* Close cm

* gen_cluster2() is a cm

* Close cluster and client in tests

* Patch daskcluster resource before deleting

* Add async to KubeCluster2

* Remove delete handler

* Ensure cluster is scaled down with dask rpc

* Wait for cluster pods to be ready

* Wait for cluster resources after creating them

* Remove async from KubeCluster2

* Patch dask cluster resource

* Fix syntax error in kubectl command

* Explicitly close the client

* Close rpc objects

* Don't delete cluster twice

* Mark test as asyncio

* Remove Client from test

* Patch daskcluster CR before deleting

* Instantiate KubeCluster2 with a cm

* Fix KubeCluster cm impl

* Wait for cluster resources to be deleted

* Split up kubecluster2 tests

* Add test_basic for kubecluster2

* Add test_scale_up_down for KubeCluster2

* Remove test_scale_up_down

* Add test_scale_up_down back

* Clean up code

* Delete scale_cluster_up_and_down test

* Remove test_basic_kubecluster test

* Add TODO for default namespace

* Add autoscaling to operator

* Clean up code and wait for service

* Fix bug workers not deleted in simplecluster tests

Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>

* Remove autoscaling (#426)

* Support Multiple Clusters (#425)

* Resolve name conflicts in wg

* Add test for multiple clusters

* Singleton Class for Dask RPC (#427)

* Resolve name conflicts in wg

* Add test for multiple clusters

* Add singleton class for dask-rpc

* Clean up PR comments

* Move some function to utils

* Add check for kubectl dependecy in operator (#428)

Co-authored-by: Jacob Tomlinson <jacobtomlinson@users.noreply.github.com>

* Add properties to dask custom resources definitions (#429)

* Add properties dask custom resources definitions

* Preserve unknown fields in Status

* Preserve all unknown fields

* Remove preserve unknown fields

* Clean up PR

* Install kubectl (#431)

* Fix tests (#432)

* Install kubectl

* Removetimeout from simplecluster test

* Revert "Fix tests (#432)" (#433)

This reverts commit e61cf1e.

* Fix docker file to Start the Operator in a Running Pod (#434)

* Fix docker file to Start the Operator in a Running Pod

* Change cr and crb

* Change manifest file

* Dask Operator Documentation (#435)

* Fix docker file to Start the Operator in a Running Pod

* Change cr and crb

* Change manifest file

* Add documentation for the operator

* Add python labels to python code

* Fix doc not rendering correctly

* Fix doc not rendering correctly

* Fix doc not rendering correctly

* Address review comments

* Fix rendering issue

* Fix rendering issue

* Fix rendering issue

* Move dedscription of kubecluster2

* Fix dask op description

* Address comments from review

* Link API in kubecluster2 docs

* Detail KubeCluster2 parameter definitions and examples in Configuration section

* Fix env example not rendering

* Add documentation for kubecluster2 to dask kubernetes home page

* Expanded on some things

* Bump pre-commit things

Co-authored-by: Jacob Tomlinson <jtomlinson@nvidia.com>

* Rename dask_kubernetes.KubeCluster2 to dask_kubernetes.experimental.KubeCluster (#437)

* Remove kubectl dependency from operator (#438)

* Remove kubectl dependency from operator

* Remove stray self arg

* Reuse existing auth code

Co-authored-by: Matthew Murray <41342305+Matt711@users.noreply.github.com>
Co-authored-by: Matthew Murray <mmurray@dgx15.aselab.nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants