186) -> None:
187 """Parse metrics from a single file."""
188 objects_count = -1
189 sources_count = -1
190 forced_sources_count = -1
191
192 match mode:
193 case "pipeline":
194 line_re = _LOG_LINE_RE_PIPELINE
195 case "replication":
196 line_re = _LOG_LINE_RE_REPLICATION
197 case "ap_proto":
198 line_re = _LOG_LINE_RE_AP_PROTO
199 case "json_line":
200 line_re = _LOG_LINE_RE_JSON_LINE
201 case _:
202 raise ValueError(f"Unexpected mode: {mode}")
203
204 for line in file:
205 line = line.strip()
206 if fix_row_count and mode == "pipeline":
207
208 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
209 objects_count = int(match.group("count"))
210 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
211 sources_count = int(match.group("count1")) + int(match.group("count2"))
212 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
213 forced_sources_count = int(match.group("count"))
214
215 if match := line_re.match(line):
216 metric_str = match.group("metric")
217 try:
218 metric: dict[str, Any] = json.loads(metric_str)
219 except json.JSONDecodeError:
220
221
222 continue
223 tags = dict(extra_tags)
224
225 name: str = metric["name"]
226 if mode == "replication":
227 if name in _SKIP_METRICS_REPLICATION:
228 continue
229 elif mode == "ap_proto":
230 if name in _SKIP_METRICS_AP_PROTO:
231 continue
232
233 timestamp: float = metric["timestamp"]
234 for tag, tag_val in metric["tags"].items():
235 tags[tag] = tag_val
236 values: dict[str, Any] = metric["values"]
237
238 if fix_row_count and name == "insert_time":
239 if tags["table"].startswith("DiaObject"):
240 values["row_count"] = objects_count
241 elif tags["table"].startswith("DiaSource"):
242 values["row_count"] = sources_count
243 elif tags["table"].startswith("DiaForcedSource"):
244 values["row_count"] = forced_sources_count
245
246 if mode == "pipeline" and context_keys:
247 tags.update(_extract_mdc(match, context_keys))
248
249 for tag in drop_tags:
250 tags.pop(tag, None)
251
252 _print_metrics(prefix + name, tags, values, timestamp)
253
254 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
255 tags = dict(extra_tags)
256 tags["level"] = match.group("level").lower()
257 dt = datetime.fromisoformat(match.group("datetime"))
258 timestamp = dt.timestamp()
259 tags.update(_extract_mdc(match, context_keys))
260 values = {"count": 1}
261
262 message = match.group("message")
263 for message_re, name in _CASSNDRA_MESSAGES_RE:
264 if (message_match := message_re.search(message)) is not None:
265 tags.update(message_match.groupdict())
266 _print_metrics(prefix + name, tags, values, timestamp)
267 break
268
269