LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.scripts.metrics Namespace Reference

Functions

None metrics_log_to_influx (Iterable[str] file, str context_keys, str extra_tags, bool fix_row_count, str mode, str prefix, bool no_header, str header_database)
 
None _metrics_log_to_influx (TextIO file, Iterable[str] context_keys, dict[str, Any] extra_tags, set[str] drop_tags, bool fix_row_count, str mode, str prefix)
 
None _print_metrics (str name, dict[str, Any] tags, dict[str, Any] values, float timestamp)
 
dict[str, Any] _extract_mdc (re.Match match, Iterable[str] context_keys)
 

Variables

 _LOG_LINE_RE_PIPELINE
 
 _LOG_LINE_RE_REPLICATION
 
 _LOG_LINE_RE_AP_PROTO
 
 _LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$")
 
 _LOG_LINE_CASSANDRA_RE
 
 _AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
 
 _AP_PIPE_DIASOURCES_RE
 
 _AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
 
tuple _CASSNDRA_MESSAGES_RE
 
dict _SKIP_METRICS_REPLICATION
 
dict _SKIP_METRICS_AP_PROTO
 

Function Documentation

◆ _extract_mdc()

dict[str, Any] lsst.dax.apdb.scripts.metrics._extract_mdc ( re.Match match,
Iterable[str] context_keys )
protected

Definition at line 276 of file metrics.py.

276def _extract_mdc(match: re.Match, context_keys: Iterable[str]) -> dict[str, Any]:
277 tags: dict[str, Any] = {}
278 mdc_str = match.group("MDC")
279 if mdc_str:
280 mdc_str = mdc_str.replace("'", '"')
281 mdc: dict[str, Any] = yaml.safe_load(io.StringIO(mdc_str))
282 for tag in context_keys:
283 if (tag_val := mdc.get(tag)) is not None:
284 tags[tag] = tag_val
285 return tags

◆ _metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics._metrics_log_to_influx ( TextIO file,
Iterable[str] context_keys,
dict[str, Any] extra_tags,
set[str] drop_tags,
bool fix_row_count,
str mode,
str prefix )
protected
Parse metrics from a single file.

Definition at line 178 of file metrics.py.

186) -> None:
187 """Parse metrics from a single file."""
188 objects_count = -1
189 sources_count = -1
190 forced_sources_count = -1
191
192 match mode:
193 case "pipeline":
194 line_re = _LOG_LINE_RE_PIPELINE
195 case "replication":
196 line_re = _LOG_LINE_RE_REPLICATION
197 case "ap_proto":
198 line_re = _LOG_LINE_RE_AP_PROTO
199 case "json_line":
200 line_re = _LOG_LINE_RE_JSON_LINE
201 case _:
202 raise ValueError(f"Unexpected mode: {mode}")
203
204 for line in file:
205 line = line.strip()
206 if fix_row_count and mode == "pipeline":
207 # Counts come from separate AP messages.
208 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
209 objects_count = int(match.group("count"))
210 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
211 sources_count = int(match.group("count1")) + int(match.group("count2"))
212 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
213 forced_sources_count = int(match.group("count"))
214
215 if match := line_re.match(line):
216 metric_str = match.group("metric")
217 try:
218 metric: dict[str, Any] = json.loads(metric_str)
219 except json.JSONDecodeError:
220 # Ignore parsing erors, sometimes it happens that lines are
221 # scrambled.
222 continue
223 tags = dict(extra_tags)
224
225 name: str = metric["name"]
226 if mode == "replication":
227 if name in _SKIP_METRICS_REPLICATION:
228 continue
229 elif mode == "ap_proto":
230 if name in _SKIP_METRICS_AP_PROTO:
231 continue
232
233 timestamp: float = metric["timestamp"]
234 for tag, tag_val in metric["tags"].items():
235 tags[tag] = tag_val
236 values: dict[str, Any] = metric["values"]
237
238 if fix_row_count and name == "insert_time":
239 if tags["table"].startswith("DiaObject"):
240 values["row_count"] = objects_count
241 elif tags["table"].startswith("DiaSource"):
242 values["row_count"] = sources_count
243 elif tags["table"].startswith("DiaForcedSource"):
244 values["row_count"] = forced_sources_count
245
246 if mode == "pipeline" and context_keys:
247 tags.update(_extract_mdc(match, context_keys))
248
249 for tag in drop_tags:
250 tags.pop(tag, None)
251
252 _print_metrics(prefix + name, tags, values, timestamp)
253
254 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
255 tags = dict(extra_tags)
256 tags["level"] = match.group("level").lower()
257 dt = datetime.fromisoformat(match.group("datetime"))
258 timestamp = dt.timestamp()
259 tags.update(_extract_mdc(match, context_keys))
260 values = {"count": 1}
261
262 message = match.group("message")
263 for message_re, name in _CASSNDRA_MESSAGES_RE:
264 if (message_match := message_re.search(message)) is not None:
265 tags.update(message_match.groupdict())
266 _print_metrics(prefix + name, tags, values, timestamp)
267 break
268
269

◆ _print_metrics()

None lsst.dax.apdb.scripts.metrics._print_metrics ( str name,
dict[str, Any] tags,
dict[str, Any] values,
float timestamp )
protected

Definition at line 270 of file metrics.py.

270def _print_metrics(name: str, tags: dict[str, Any], values: dict[str, Any], timestamp: float) -> None:
271 tags_str = ",".join([name] + [f"{key}={val}" for key, val in tags.items()])
272 values_str = ",".join(f"{key}={val}" for key, val in values.items())
273 print(f"{tags_str} {values_str} {int(timestamp * 1e9)}")
274
275

◆ metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics.metrics_log_to_influx ( Iterable[str] file,
str context_keys,
str extra_tags,
bool fix_row_count,
str mode,
str prefix,
bool no_header,
str header_database )
Extract metrics from log file and dump as InfluxDB data.

Parameters
----------
file : `~collections.abc.Iterable` [`str`]
    Names of the files to parse for metrics.
context_keys : `str`
    Names of keys to extract from message context, comma-separated.
extra_tags : `str`
    Additional tags to add to each record, comma-separated key=value pairs.
fix_row_count : `bool`
    If True then extract records counts from pipeline messages instead of
    metrics. A workaround for broken metrics.
mode : `str`
    Source of the log, one of "ap_proto", "pipeline", "replication",
    "json_line".
prefix : `str`
    Prefix to add to each tag name.
no_header : `bool`
    If False then do not print DML header.
header_database : `str`
    Name of the database for DML header.

Definition at line 115 of file metrics.py.

124) -> None:
125 """Extract metrics from log file and dump as InfluxDB data.
126
127 Parameters
128 ----------
129 file : `~collections.abc.Iterable` [`str`]
130 Names of the files to parse for metrics.
131 context_keys : `str`
132 Names of keys to extract from message context, comma-separated.
133 extra_tags : `str`
134 Additional tags to add to each record, comma-separated key=value pairs.
135 fix_row_count : `bool`
136 If True then extract records counts from pipeline messages instead of
137 metrics. A workaround for broken metrics.
138 mode : `str`
139 Source of the log, one of "ap_proto", "pipeline", "replication",
140 "json_line".
141 prefix : `str`
142 Prefix to add to each tag name.
143 no_header : `bool`
144 If False then do not print DML header.
145 header_database : `str`
146 Name of the database for DML header.
147 """
148 context_names = [name for name in context_keys.split(",") if name]
149 tags: dict[str, Any] = {}
150 drop_tags: set[str] = set()
151 for tag_val in extra_tags.split(","):
152 if tag_val:
153 tag, _, val = tag_val.partition("=")
154 if tag.startswith("-"):
155 drop_tags.add(tag.strip("-"))
156 else:
157 tags[tag] = val
158
159 if not no_header:
160 print(
161 f"""\
162# DML
163
164# CONTEXT-DATABASE: {header_database}
165"""
166 )
167
168 if not file:
169 file = ["-"]
170 for file_name in file:
171 if file_name == "-":
172 _metrics_log_to_influx(sys.stdin, context_names, tags, drop_tags, fix_row_count, mode, prefix)
173 else:
174 with open(file_name) as file_obj:
175 _metrics_log_to_influx(file_obj, context_names, tags, drop_tags, fix_row_count, mode, prefix)
176
177

Variable Documentation

◆ _AP_PIPE_DIAFORCED_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
protected

Definition at line 86 of file metrics.py.

◆ _AP_PIPE_DIAOBJECTS_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
protected

Definition at line 82 of file metrics.py.

◆ _AP_PIPE_DIASOURCES_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIASOURCES_RE
protected
Initial value:
1= re.compile(
2 r"(?P<count1>\d+) updated and \d+ unassociated diaObjects. Creating (?P<count2>\d+) new diaObjects"
3)

Definition at line 83 of file metrics.py.

◆ _CASSNDRA_MESSAGES_RE

tuple lsst.dax.apdb.scripts.metrics._CASSNDRA_MESSAGES_RE
protected
Initial value:
1= (
2 (re.compile(r"^Error preparing query for host (?P<host>\S+):$"), "error_prepare_query"),
3 (re.compile(r"^Control connection failed to connect"), "error_control_connect"),
4 (
5 re.compile(r"^Unexpected failure handling node (?P<host>\S+) being marked up:$"),
6 "error_failure_marking_up",
7 ),
8 (re.compile(r"^Failed to submit task to executor$"), "error_submit_task"),
9 (re.compile(r"^Failed to create connection pool for new host (?P<host>\S+):$"), "warn_create_pool"),
10 (re.compile(r"^Error attempting to reconnect to (?P<host>\S+),"), "warn_reconnect"),
11 (re.compile(r"^Host (?P<host>\S+) has been marked down"), "warn_host_down"),
12)

Definition at line 88 of file metrics.py.

◆ _LOG_LINE_CASSANDRA_RE

lsst.dax.apdb.scripts.metrics._LOG_LINE_CASSANDRA_RE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 71 of file metrics.py.

◆ _LOG_LINE_RE_AP_PROTO

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_AP_PROTO
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 57 of file metrics.py.

◆ _LOG_LINE_RE_JSON_LINE

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$")
protected

Definition at line 68 of file metrics.py.

◆ _LOG_LINE_RE_PIPELINE

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_PIPELINE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 36 of file metrics.py.

◆ _LOG_LINE_RE_REPLICATION

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_REPLICATION
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 47 of file metrics.py.

◆ _SKIP_METRICS_AP_PROTO

dict lsst.dax.apdb.scripts.metrics._SKIP_METRICS_AP_PROTO
protected
Initial value:
1= {
2 "read_metadata_config",
3 "version_check",
4 "insert_build_time",
5}

Definition at line 108 of file metrics.py.

◆ _SKIP_METRICS_REPLICATION

dict lsst.dax.apdb.scripts.metrics._SKIP_METRICS_REPLICATION
protected
Initial value:
1= {
2 "read_metadata_config",
3 "version_check",
4}

Definition at line 103 of file metrics.py.