|
| 1 | +import json |
1 | 2 | import os |
2 | 3 | import sys |
3 | 4 | from unittest import mock |
@@ -348,3 +349,149 @@ def test_login_deletes_preserve_intent_cookie_on_valid_redirect(self, mock_stats |
348 | 349 | for call in mock_web.setcookie.call_args_list: |
349 | 350 | assert call[0][0] != "pending_action" |
350 | 351 | mock_web.seeother.assert_called_with("/account/books") |
| 352 | + |
| 353 | + |
| 354 | +class TestOtpServiceS3Auth: |
| 355 | + """Tests for S3 key validation on /account/otp/issue and /account/otp/redeem.""" |
| 356 | + |
| 357 | + def _make_web_mock(self, auth_header=""): |
| 358 | + m = mock.MagicMock() |
| 359 | + m.ctx.env = {"HTTP_AUTHORIZATION": auth_header, "HTTP_X_FORWARDED_FOR": "1.2.3.4"} |
| 360 | + m.input.return_value = web.storage(email="test@example.com", ip="1.2.3.4", challenge_url="", sendmail="false", otp="123456") |
| 361 | + m.safestr.side_effect = lambda x: x |
| 362 | + return m |
| 363 | + |
| 364 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 365 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 366 | + def test_issue_missing_auth_header(self, mock_web, mock_ia): |
| 367 | + mock_web.ctx.env = {} |
| 368 | + result = account.otp_service_issue().POST() |
| 369 | + body = json.loads(result.rawtext) |
| 370 | + assert body["error"] == "missing_or_invalid_authorization" |
| 371 | + mock_ia.s3auth.assert_not_called() |
| 372 | + |
| 373 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 374 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 375 | + def test_issue_empty_secret_rejected(self, mock_web, mock_ia): |
| 376 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW access:"} |
| 377 | + result = account.otp_service_issue().POST() |
| 378 | + body = json.loads(result.rawtext) |
| 379 | + assert body["error"] == "missing_or_invalid_authorization" |
| 380 | + mock_ia.s3auth.assert_not_called() |
| 381 | + |
| 382 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 383 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 384 | + def test_issue_invalid_keys_rejected(self, mock_web, mock_ia): |
| 385 | + mock_ia.s3auth.return_value = {"error": "invalid_s3keys", "code": 401} |
| 386 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW badaccess:badsecret"} |
| 387 | + result = account.otp_service_issue().POST() |
| 388 | + body = json.loads(result.rawtext) |
| 389 | + assert body["error"] == "unauthorized" |
| 390 | + |
| 391 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 392 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 393 | + def test_issue_auth_service_5xx_returns_specific_error(self, mock_web, mock_ia): |
| 394 | + mock_ia.s3auth.return_value = {"error": "service error", "code": 503} |
| 395 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW access:secret"} |
| 396 | + result = account.otp_service_issue().POST() |
| 397 | + body = json.loads(result.rawtext) |
| 398 | + assert body["error"] == "auth_service_unavailable" |
| 399 | + |
| 400 | + @mock.patch("openlibrary.plugins.upstream.account.OTP") |
| 401 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 402 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 403 | + def test_issue_valid_keys_proceeds(self, mock_web, mock_ia, mock_otp): |
| 404 | + mock_ia.s3auth.return_value = {"success": True, "itemname": "@testuser"} |
| 405 | + mock_web.ctx.env = { |
| 406 | + "HTTP_AUTHORIZATION": "LOW goodaccess:goodsecret", |
| 407 | + "HTTP_X_FORWARDED_FOR": "1.2.3.4", |
| 408 | + } |
| 409 | + mock_web.input.return_value = web.storage(email="test@example.com", ip="1.2.3.4", challenge_url="", sendmail="false") |
| 410 | + mock_otp.generate.return_value = "abc123" |
| 411 | + mock_otp.is_ratelimited.return_value = None |
| 412 | + mock_otp.verify_service.return_value = True |
| 413 | + result = account.otp_service_issue().POST() |
| 414 | + body = json.loads(result.rawtext) |
| 415 | + assert body == {"success": "issued"} |
| 416 | + |
| 417 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 418 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 419 | + def test_redeem_missing_auth_header(self, mock_web, mock_ia): |
| 420 | + mock_web.ctx.env = {} |
| 421 | + result = account.otp_service_redeem().POST() |
| 422 | + body = json.loads(result.rawtext) |
| 423 | + assert body["error"] == "missing_or_invalid_authorization" |
| 424 | + mock_ia.s3auth.assert_not_called() |
| 425 | + |
| 426 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 427 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 428 | + def test_redeem_invalid_keys_rejected(self, mock_web, mock_ia): |
| 429 | + mock_ia.s3auth.return_value = {"error": "invalid_s3keys", "code": 401} |
| 430 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW bad:creds"} |
| 431 | + result = account.otp_service_redeem().POST() |
| 432 | + body = json.loads(result.rawtext) |
| 433 | + assert body["error"] == "unauthorized" |
| 434 | + |
| 435 | + @mock.patch("openlibrary.plugins.upstream.account.OTP") |
| 436 | + @mock.patch("openlibrary.plugins.upstream.account.InternetArchiveAccount") |
| 437 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 438 | + def test_redeem_valid_keys_proceeds(self, mock_web, mock_ia, mock_otp): |
| 439 | + mock_ia.s3auth.return_value = {"success": True, "itemname": "@testuser"} |
| 440 | + mock_web.ctx.env = { |
| 441 | + "HTTP_AUTHORIZATION": "LOW goodaccess:goodsecret", |
| 442 | + "HTTP_X_FORWARDED_FOR": "1.2.3.4", |
| 443 | + } |
| 444 | + mock_web.input.return_value = web.storage(email="test@example.com", ip="1.2.3.4", otp="abc123") |
| 445 | + mock_otp.is_valid.return_value = True |
| 446 | + result = account.otp_service_redeem().POST() |
| 447 | + body = json.loads(result.rawtext) |
| 448 | + assert body == {"success": "redeemed"} |
| 449 | + |
| 450 | + |
| 451 | +class TestParseLowAuthHeader: |
| 452 | + """Unit tests for the _parse_low_auth_header() module-level helper.""" |
| 453 | + |
| 454 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 455 | + def test_missing_header_raises(self, mock_web): |
| 456 | + mock_web.ctx.env = {} |
| 457 | + with pytest.raises(ValueError, match="Missing or invalid"): |
| 458 | + account._parse_low_auth_header() |
| 459 | + |
| 460 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 461 | + def test_bearer_prefix_rejected(self, mock_web): |
| 462 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "Bearer sometoken"} |
| 463 | + with pytest.raises(ValueError, match="Missing or invalid"): |
| 464 | + account._parse_low_auth_header() |
| 465 | + |
| 466 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 467 | + def test_no_colon_raises(self, mock_web): |
| 468 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW accessonly"} |
| 469 | + with pytest.raises(ValueError, match="Malformed"): |
| 470 | + account._parse_low_auth_header() |
| 471 | + |
| 472 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 473 | + def test_empty_secret_raises(self, mock_web): |
| 474 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW access:"} |
| 475 | + with pytest.raises(ValueError, match="Empty"): |
| 476 | + account._parse_low_auth_header() |
| 477 | + |
| 478 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 479 | + def test_empty_access_raises(self, mock_web): |
| 480 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW :secret"} |
| 481 | + with pytest.raises(ValueError, match="Empty"): |
| 482 | + account._parse_low_auth_header() |
| 483 | + |
| 484 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 485 | + def test_valid_returns_stripped_tuple(self, mock_web): |
| 486 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW myaccess : mysecret "} |
| 487 | + access, secret = account._parse_low_auth_header() |
| 488 | + assert access == "myaccess" |
| 489 | + assert secret == "mysecret" |
| 490 | + |
| 491 | + @mock.patch("openlibrary.plugins.upstream.account.web") |
| 492 | + def test_colon_in_secret_preserved(self, mock_web): |
| 493 | + # S3 secrets can contain colons; only split on the first one |
| 494 | + mock_web.ctx.env = {"HTTP_AUTHORIZATION": "LOW access:sec:ret"} |
| 495 | + access, secret = account._parse_low_auth_header() |
| 496 | + assert access == "access" |
| 497 | + assert secret == "sec:ret" |
0 commit comments