use modin in remote context via rpyc#1659
use modin in remote context via rpyc#1659anmyachev wants to merge 31 commits intomodin-project:masterfrom
Conversation
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…test-autoscaler
…test-autoscaler
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
6f00888 to
44e807d
Compare
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…test-autoscaler
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
|
Last days, I have been working on launching the mortgage benchmark on a remote modin context. pd.read_csv(csv_path, names = ["seller_name", "new"], delimiter = "|")caused the error: pickle protocol must be <= 4kw = {'names': ['loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state', 'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 'relocation_mortgage_indicator', 'year_quarter_ignore'], 'index_col': False}
print(pd.read_csv(acq_path,
dtype={'loan_id': 'int64', 'orig_channel': 'category', 'seller_name': 'category', 'orig_interest_rate': 'float64', 'orig_upb': 'int64', 'orig_loan_term': 'int64', 'orig_ltv': 'float64', 'orig_cltv': 'float64', 'num_borrowers': 'float64', 'dti': 'float64', 'borrower_credit_score': 'float64', 'first_home_buyer': 'category', 'loan_purpose': 'category', 'property_type': 'category', 'num_units': 'int64', 'occupancy_status': 'category', 'property_state': 'category', 'zip': 'int64', 'mortgage_insurance_percent': 'float64', 'product_type': 'category', 'coborrow_credit_score': 'float64', 'mortgage_insurance_type': 'float64', 'relocation_mortgage_indicator': 'category'},
parse_dates=['orig_date', 'first_pay_date'], skiprows=1, delimiter="|", **kw))caused the error: TypeError: Cannot interpret '{'loan_id': 'int64', 'orig_channel': 'category', 'seller_name': 'category', 'orig_interest_rate': 'float64', 'orig_upb': 'int64', 'orig_loan_term': 'int64', 'orig_ltv': 'float64', 'orig_cltv': 'float64', 'num_borrowers': 'float64', 'dti': 'float64', 'borrower_credit_score': 'float64', 'first_home_buyer': 'category', 'loan_purpose': 'category', 'property_type': 'category', 'num_units': 'int64', 'occupancy_status': 'category', 'property_state': 'category', 'zip': 'int64', 'mortgage_insurance_percent': 'float64', 'product_type': 'category', 'coborrow_credit_score': 'float64', 'mortgage_insurance_type': 'float64', 'relocation_mortgage_indicator': 'category'}' as a data typeIn the case of The following modification was used as a quick solution to this problem: # read_csv = _make_parser_func(sep=",")
def read_csv(*args, **kwargs):
from .. import execution_engine, _create_cloud_conn
conn = _create_cloud_conn()
import rpyc
args = rpyc.classic.deliver(conn, args)
kwargs = rpyc.classic.deliver(conn, kwargs)
if execution_engine.get() == "Cloudray":
read_csv = _create_cloud_conn().modules["modin.pandas"].read_csv
return read_csv(*args, **kwargs)
else:
return _make_parser_func(sep=",")(*args, **kwargs) |
|
Other problems arose when calling some methods of the joined_df.loc[:, loc_list]caused the error: TypeError: 'list' object is not callableThis workaround allows me to get around this problem, but only for research purposes. from modin import _create_cloud_conn
conn = _create_cloud_conn()
loc_list = [
"loan_id",
"monthly_reporting_period",
"current_loan_delinquency_status",
"current_actual_upb",
]
import rpyc
loc_list = rpyc.classic.deliver(conn, loc_list)
test = gdf.loc[:,loc_list]UPD: Adding next lines into if name_pack == "builtins.list" and name == "__call__":
continueclass_factorydef class_factory(id_pack, methods):
"""Creates a netref class proxying the given class
:param id_pack: the id pack used for proxy communication
:param methods: a list of ``(method name, docstring)`` tuples, of the methods that the class defines
:returns: a netref class
"""
ns = {"__slots__": (), "__class__": None}
name_pack = id_pack[0]
class_descriptor = None
if name_pack is not None:
# attempt to resolve __class__ using sys.modules (i.e. builtins and imported modules)
_module = None
cursor = len(name_pack)
while cursor != -1:
_module = sys.modules.get(name_pack[:cursor])
if _module is None:
cursor = name_pack[:cursor].rfind('.')
continue
_class_name = name_pack[cursor + 1:]
_class = getattr(_module, _class_name, None)
if _class is not None and hasattr(_class, '__class__'):
class_descriptor = NetrefClass(_class)
break
ns['__class__'] = class_descriptor
netref_name = class_descriptor.owner.__name__ if class_descriptor is not None else name_pack
# create methods that must perform a syncreq
for name, doc in methods:
name = str(name) # IronPython issue #10
# only create methods that wont shadow BaseNetref during merge for mro
if name not in LOCAL_ATTRS: # i.e. `name != __class__`
if name_pack == "builtins.list" and name == "__call__":
continue
ns[name] = _make_method(name, doc)
return type(netref_name, (BaseNetref,), ns) |
|
At the moment, part of the mortgage benchmark with I’m currently investigating errors that occur when |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
|
Calling this: tmpdf = tmpdf.groupby(["loan_id", "josh_mody_n"], as_index=False).agg({"delinquency_12": "max", "upb_12": "min"})caused the error: TypeError: __init__() missing 2 required positional arguments: 'conn' and 'id_pack'This happens because the rpyc wrapper over the dict class replaces its attributes(when proxy object is creating) so that when |
|
Another problem: EOFError: connection closed by peer One of the reasons is the excess of timeouts that are set for the rpyc.zerodeploy.DeployedServer` object. Since setting timeouts for this entity is not implied, we will start the server during execution |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
What do these changes do?
flake8 modinblack --check modingit commit -s