-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathuseful_snippets.py
More file actions
213 lines (168 loc) · 6.46 KB
/
useful_snippets.py
File metadata and controls
213 lines (168 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# Standardize Your Data Import Process
def load_dataset(file_path, **kwargs):
"""
Load data from various file formats while handling common issues.
Args:
file_path (str): Path to the data file
**kwargs: Additional arguments to pass to the appropriate pandas reader
Returns:
pd.DataFrame: Loaded and initially processed dataframe
"""
import pandas as pd
from pathlib import Path
file_type = Path(file_path).suffix.lower()
# Dictionary of file handlers
handlers = {
'.csv': pd.read_csv,
'.xlsx': pd.read_excel,
'.json': pd.read_json,
'.parquet': pd.read_parquet
}
# Get appropriate reader function
reader = handlers.get(file_type)
if reader is None:
raise ValueError(f"Unsupported file type: {file_type}")
# Load data with common cleaning parameters
df = reader(file_path, **kwargs)
# Initial cleaning steps
df.columns = df.columns.str.strip().str.lower() # Standardize column names
df = df.replace('', pd.NA) # Convert empty strings to NA
return df
# Implement Automated Data Validation
def validate_dataset(df, validation_rules=None):
"""
Apply validation rules to a dataframe and return validation results.
Args:
df (pd.DataFrame): Input dataframe
validation_rules (dict): Dictionary of column names and their validation rules
Returns:
dict: Validation results with issues found
"""
if validation_rules is None:
validation_rules = {
'numeric_columns': {
'check_type': 'numeric',
'min_value': 0,
'max_value': 1000000
},
'date_columns': {
'check_type': 'date',
'min_date': '2000-01-01',
'max_date': '2025-12-31'
}
}
validation_results = {}
for column, rules in validation_rules.items():
if column not in df.columns:
continue
issues = []
# Check for missing values
missing_count = df[column].isna().sum()
if missing_count > 0:
issues.append(f"Found {missing_count} missing values")
# Type-specific validations
if rules['check_type'] == 'numeric':
if not pd.api.types.is_numeric_dtype(df[column]):
issues.append("Column should be numeric")
else:
out_of_range = df[
(df[column] < rules['min_value']) |
(df[column] > rules['max_value'])
]
if len(out_of_range) > 0:
issues.append(f"Found {len(out_of_range)} values outside allowed range")
validation_results[column] = issues
return validation_results
# Create a Data Cleaning Pipeline
class DataCleaningPipeline:
"""
A modular pipeline for cleaning data with customizable steps.
"""
def __init__(self):
self.steps = []
def add_step(self, name, function):
"""Add a cleaning step."""
self.steps.append({'name': name, 'function': function})
def execute(self, df):
"""Execute all cleaning steps in order."""
results = []
current_df = df.copy()
for step in self.steps:
try:
current_df = step['function'](current_df)
results.append({
'step': step['name'],
'status': 'success',
'rows_affected': len(current_df)
})
except Exception as e:
results.append({
'step': step['name'],
'status': 'failed',
'error': str(e)
})
break
return current_df, results
def remove_duplicates(df):
return df.drop_duplicates()
def standardize_dates(df):
date_columns = df.select_dtypes(include=['datetime64']).columns
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
return df
pipeline = DataCleaningPipeline()
pipeline.add_step('remove_duplicates', remove_duplicates)
pipeline.add_step('standardize_dates', standardize_dates)
# Automate String Cleaning and Standardization
def clean_text_columns(df, columns=None):
"""
Apply standardized text cleaning to specified columns.
Args:
df (pd.DataFrame): Input dataframe
columns (list): List of columns to clean. If None, clean all object columns
Returns:
pd.DataFrame: Dataframe with cleaned text columns
"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df = df.copy()
for column in columns:
if column not in df.columns:
continue
# Apply string cleaning operations
df[column] = (df[column]
.astype(str)
.str.strip()
.str.lower()
.replace(r'\s+', ' ', regex=True) # Replace multiple spaces
.replace(r'[^\w\s]', '', regex=True)) # Remove special characters
return df
# Monitor Data Quality Over Time
def generate_quality_metrics(df, baseline_metrics=None):
"""
Generate quality metrics for a dataset and compare with baseline if provided.
Args:
df (pd.DataFrame): Input dataframe
baseline_metrics (dict): Previous metrics to compare against
Returns:
dict: Current metrics and comparison with baseline
"""
metrics = {
'row_count': len(df),
'missing_values': df.isna().sum().to_dict(),
'unique_values': df.nunique().to_dict(),
'data_types': df.dtypes.astype(str).to_dict()
}
# Add descriptive statistics for numeric columns
numeric_columns = df.select_dtypes(include=['number']).columns
metrics['numeric_stats'] = df[numeric_columns].describe().to_dict()
# Compare with baseline if provided
if baseline_metrics:
metrics['changes'] = {
'row_count_change': metrics['row_count'] - baseline_metrics['row_count'],
'missing_values_change': {
col: metrics['missing_values'][col] - baseline_metrics['missing_values'][col]
for col in metrics['missing_values']
}
}
return metrics