Skip to content

Commit f1839e4

Browse files
committed
Fix for coding style, part 2.
1 parent ce9e1d7 commit f1839e4

8 files changed

Lines changed: 58 additions & 27 deletions

File tree

src/meltano/cli/state.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def meltano_state(project: Project, ctx: click.Context):
110110
"""
111111
_, sessionmaker = project_engine(project)
112112
session = sessionmaker()
113-
ctx.obj[STATE_SERVICE_KEY] = StateService(session, ProjectSettingsService(project)) # noqa: WPS204
113+
ctx.obj[STATE_SERVICE_KEY] = StateService(
114+
session, ProjectSettingsService(project)
115+
) # noqa: WPS204
114116

115117

116118
@meltano_state.command(cls=InstrumentedCmd, name="list")
@@ -311,15 +313,16 @@ def clear_state(ctx: click.Context, project: Project, state_id: str, force: bool
311313
)
312314
state_service.clear_state(state_id)
313315

316+
314317
@meltano_state.command(cls=InstrumentedCmd, name="vacuum")
315318
@click.option("--pattern", type=str, help="Filter state IDs by pattern.")
316319
@click.option("--rows-to-keep", type=int, help="Rows to keep after vacuuming.")
317320
@click.pass_context
318321
@pass_project()
319-
def vacuum_state(project: Project, ctx: click.Context, pattern: str | None, rows_to_keep: int | None):
322+
def vacuum_state(
323+
project: Project, ctx: click.Context, pattern: str | None, rows_to_keep: int | None
324+
):
320325
"""Housekeep state storage to free up disk space."""
321326
state_service: StateService = ctx.obj[STATE_SERVICE_KEY]
322327
delete_count = state_service.vacuum(pattern, rows_to_keep)
323-
logger.info(
324-
f"Vacuumed state storage, removing {delete_count} rows from table."
325-
)
328+
logger.info(f"Vacuumed state storage, removing {delete_count} rows from table.")

src/meltano/core/block/extract_load.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,9 @@ def state_service(self) -> StateService:
358358
"""
359359
if not self._state_service:
360360
if self.has_state():
361-
self._state_service = StateService(self.context.session, ProjectSettingsService(self.context.project))
361+
self._state_service = StateService(
362+
self.context.session, ProjectSettingsService(self.context.project)
363+
)
362364
else:
363365
raise BlockSetHasNoStateError()
364366
return self._state_service

src/meltano/core/job/job.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from datetime import datetime, timedelta
1010
from enum import Enum
1111

12-
from sqlalchemy import Column, literal, types, delete, desc
12+
from sqlalchemy import Column, delete, desc, literal, types
1313
from sqlalchemy.ext.hybrid import Comparator, hybrid_property
1414
from sqlalchemy.ext.mutable import MutableDict
1515

@@ -346,15 +346,26 @@ def vaccum(session, job_name: str, rows_to_keep: int):
346346
the number of rows deleted
347347
"""
348348
# get the (n+1)-th latest row, or None if there is less or equal than n rows in the table
349-
latest_job_to_delete = session.query(Job).filter(Job.job_name == job_name).order_by(desc(Job.id)).offset(rows_to_keep).first()
349+
latest_job_to_delete = (
350+
session.query(Job)
351+
.filter(Job.job_name == job_name)
352+
.order_by(desc(Job.id))
353+
.offset(rows_to_keep)
354+
.first()
355+
)
350356
if latest_job_to_delete is not None:
351357
id_threshold = latest_job_to_delete.id
352-
delete_count = session.query(Job).filter(Job.job_name == job_name, Job.id <= id_threshold).count()
353-
session.execute(delete(Job).where(Job.job_name == job_name, Job.id <= id_threshold))
358+
delete_count = (
359+
session.query(Job)
360+
.filter(Job.job_name == job_name, Job.id <= id_threshold)
361+
.count()
362+
)
363+
session.execute(
364+
delete(Job).where(Job.job_name == job_name, Job.id <= id_threshold)
365+
)
354366
session.commit()
355367
return delete_count
356-
else:
357-
return 0
368+
return 0
358369

359370
def _heartbeat(self):
360371
"""Update last_heartbeat_at for this job in the db."""

src/meltano/core/plugin/singer/tap.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from meltano.core.behavior.hookable import hook
1919
from meltano.core.plugin.error import PluginExecutionError, PluginLacksCapabilityError
2020
from meltano.core.plugin_invoker import PluginInvoker
21-
from meltano.core.setting_definition import SettingDefinition, SettingKind
2221
from meltano.core.project_settings_service import ProjectSettingsService
22+
from meltano.core.setting_definition import SettingDefinition, SettingKind
2323
from meltano.core.state_service import SINGER_STATE_KEY, StateService
2424
from meltano.core.utils import file_has_data, flatten
2525

@@ -290,7 +290,9 @@ async def look_up_state( # noqa: WPS231, WPS213
290290

291291
return
292292
# the `state.json` is stored in the database
293-
state = StateService(elt_context.session, ProjectSettingsService(elt_context.project)).get_state(elt_context.job.job_name)
293+
state = StateService(
294+
elt_context.session, ProjectSettingsService(elt_context.project)
295+
).get_state(elt_context.job.job_name)
294296
if state:
295297
if state.get(SINGER_STATE_KEY):
296298
with state_path.open("w") as state_file:

src/meltano/core/plugin/singer/target.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010

1111
from meltano.core.behavior.hookable import hook
1212
from meltano.core.job import Job, Payload
13-
from meltano.core.project import Project
1413
from meltano.core.plugin_invoker import PluginInvoker
15-
from meltano.core.setting_definition import SettingDefinition
14+
from meltano.core.project import Project
1615
from meltano.core.project_settings_service import ProjectSettingsService
16+
from meltano.core.setting_definition import SettingDefinition
1717
from meltano.core.state_service import SINGER_STATE_KEY, StateService
1818

1919
from . import PluginType, SingerPlugin
@@ -43,7 +43,9 @@ def __init__(
4343
"""
4444
self.job = job
4545
self.session = session
46-
self.state_service = state_service or StateService(session, ProjectSettingsService(project))
46+
self.state_service = state_service or StateService(
47+
session, ProjectSettingsService(project)
48+
)
4749
self.payload_flag = payload_flag
4850

4951
def writeline(self, line: str):
@@ -160,5 +162,7 @@ def setup_bookmark_writer(self, plugin_invoker: PluginInvoker):
160162

161163
plugin_invoker.add_output_handler(
162164
plugin_invoker.StdioSource.STDOUT,
163-
BookmarkWriter(elt_context.job, elt_context.project, elt_context.session, payload_flag),
165+
BookmarkWriter(
166+
elt_context.job, elt_context.project, elt_context.session, payload_flag
167+
),
164168
)

src/meltano/core/state_service.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,19 @@ class StateService:
3131
Currently only manages Singer state for Extract and Load jobs.
3232
"""
3333

34-
def __init__(self, session: object = None, settings = None):
34+
def __init__(self, session: object = None, settings=None):
3535
"""Create a StateService object.
3636
3737
Args:
3838
session: the session to use for interacting with the db
3939
settings: a ProjectSettingsService for the current project
4040
"""
4141
self.session = session
42-
self.max_rows_per_state: int | None = settings.get("database_max_rows_per_state") if settings is not None else None
42+
self.max_rows_per_state: int | None = (
43+
settings.get("database_max_rows_per_state")
44+
if settings is not None
45+
else None
46+
)
4347
if self.max_rows_per_state is not None and self.max_rows_per_state < 1:
4448
self.max_rows_per_state = None
4549

@@ -207,8 +211,7 @@ def move_state(self, state_id_src: str, state_id_dst: str):
207211
self.clear_state(state_id_src)
208212

209213
def _vacuum_if_necessary(self):
210-
"""Execute vacuuming if configured.
211-
"""
214+
"""Execute vacuuming if configured."""
212215
try:
213216
if self.max_rows_per_state is not None and self.max_rows_per_state > 0:
214217
delete_count = self.vacuum(None, self.max_rows_per_state)
@@ -227,7 +230,7 @@ def vacuum(self, state_id_pattern: str | None, rows_to_keep: int | None):
227230
Returns:
228231
how many rows are deleted in total
229232
"""
230-
return sum([
233+
return sum(
231234
Job.vaccum(self.session, state_id, rows_to_keep or 10)
232235
for state_id in self.state_store_manager.get_state_ids(state_id_pattern)
233-
])
236+
)

tests/meltano/core/plugin/singer/test_tap.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ async def test_look_up_state( # noqa: WPS213, WPS217
7878
)
7979

8080
invoker = plugin_invoker_factory(subject, context=elt_context)
81-
state_service = StateService(session, ProjectSettingsService(elt_context.project))
81+
state_service = StateService(
82+
session, ProjectSettingsService(elt_context.project)
83+
)
8284

8385
@contextmanager
8486
def create_job():

tests/meltano/core/test_state_service.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,13 @@ def test_move(self, state_ids, state_service):
8787
assert not state_service.get_state(state_id_src)
8888
assert state_service.get_state(state_id_dst) == state_src
8989

90-
def test_vacuum(self, state_service, state_ids_with_expected_states, state_ids_with_jobs):
90+
def test_vacuum(
91+
self, state_service, state_ids_with_expected_states, state_ids_with_jobs
92+
):
9193
rows_to_keep = 5
92-
expected_delete_count = sum([max(len(jobs) - rows_to_keep, 0) for jobs in state_ids_with_jobs.values()])
94+
expected_delete_count = sum(
95+
max(len(jobs) - rows_to_keep, 0) for jobs in state_ids_with_jobs.values()
96+
)
9397
delete_count = state_service.vacuum(None, rows_to_keep)
9498
assert state_service.list_state() == {
9599
state_id: expected_state

0 commit comments

Comments
 (0)