162) -> None:
163 """Parse metrics from a single file."""
164 objects_count = -1
165 sources_count = -1
166 forced_sources_count = -1
167
168 line_re = _LOG_LINE_RE_REPLICATION if replication else _LOG_LINE_RE_PIPELINE
169
170 for line in file:
171 line = line.strip()
172 if fix_row_count and not replication:
173 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
174 objects_count = int(match.group("count"))
175 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
176 sources_count = int(match.group("count1")) + int(match.group("count2"))
177 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
178 forced_sources_count = int(match.group("count"))
179
180 if match := line_re.match(line):
181 metric_str = match.group("metric")
182 metric: dict[str, Any] = json.loads(metric_str)
183 tags = dict(extra_tags)
184
185 name: str = metric["name"]
186 if replication and name in _SKIP_METRICS_REPLICATION:
187 continue
188
189 timestamp: float = metric["timestamp"]
190 for tag, tag_val in metric["tags"].items():
191 tags[tag] = tag_val
192 values: dict[str, Any] = metric["values"]
193
194 if fix_row_count and name == "insert_time":
195 if tags["table"].startswith("DiaObject"):
196 values["row_count"] = objects_count
197 elif tags["table"].startswith("DiaSource"):
198 values["row_count"] = sources_count
199 elif tags["table"].startswith("DiaForcedSource"):
200 values["row_count"] = forced_sources_count
201
202 if not replication and context_keys:
203 tags.update(_extract_mdc(match, context_keys))
204
205 _print_metrics(prefix + name, tags, values, timestamp)
206
207 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
208 tags = dict(extra_tags)
209 tags["level"] = match.group("level").lower()
210 dt = datetime.fromisoformat(match.group("datetime"))
211 timestamp = dt.timestamp()
212 tags.update(_extract_mdc(match, context_keys))
213 values = {"count": 1}
214
215 message = match.group("message")
216 for message_re, name in _CASSNDRA_MESSAGES_RE:
217 if (message_match := message_re.search(message)) is not None:
218 tags.update(message_match.groupdict())
219 _print_metrics(prefix + name, tags, values, timestamp)
220 break
221
222