LSST Applications g042eb84c57+730a74494b,g04e9c324dd+8c5ae1fdc5,g134cb467dc+1f1e3e7524,g199a45376c+0ba108daf9,g1fd858c14a+fa7d31856b,g210f2d0738+f66ac109ec,g262e1987ae+83a3acc0e5,g29ae962dfc+d856a2cb1f,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+a1e0c9f713,g47891489e3+0d594cb711,g4d44eb3520+c57ec8f3ed,g4d7b6aa1c5+f66ac109ec,g53246c7159+8c5ae1fdc5,g56a1a4eaf3+fd7ad03fde,g64539dfbff+f66ac109ec,g67b6fd64d1+0d594cb711,g67fd3c3899+f66ac109ec,g6985122a63+0d594cb711,g74acd417e5+3098891321,g786e29fd12+668abc6043,g81db2e9a8d+98e2ab9f28,g87389fa792+8856018cbb,g89139ef638+0d594cb711,g8d7436a09f+80fda9ce03,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+a8a29bda4b,g99822b682c+e3ec3c61f9,g9d5c6a246b+0d5dac0c3d,ga41d0fce20+9243b26dd2,gbf99507273+8c5ae1fdc5,gd7ef33dd92+0d594cb711,gdab6d2f7ff+3098891321,ge410e46f29+0d594cb711,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.38
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.scripts.metrics Namespace Reference

Functions

None metrics_log_to_influx (Iterable[str] file, str context_keys, str extra_tags, bool fix_row_count, str mode, str prefix, bool no_header, str header_database)
 
None _metrics_log_to_influx (TextIO file, Iterable[str] context_keys, dict[str, Any] extra_tags, set[str] drop_tags, bool fix_row_count, str mode, str prefix)
 
None _print_metrics (str name, dict[str, Any] tags, dict[str, Any] values, float timestamp)
 
dict[str, Any] _extract_mdc (re.Match match, Iterable[str] context_keys)
 

Variables

 _LOG_LINE_RE_PIPELINE
 
 _LOG_LINE_RE_REPLICATION
 
 _LOG_LINE_RE_AP_PROTO
 
 _LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$")
 
 _LOG_LINE_CASSANDRA_RE
 
 _AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
 
 _AP_PIPE_DIASOURCES_RE
 
 _AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
 
tuple _CASSNDRA_MESSAGES_RE
 
dict _SKIP_METRICS_REPLICATION
 
dict _SKIP_METRICS_AP_PROTO
 

Function Documentation

◆ _extract_mdc()

dict[str, Any] lsst.dax.apdb.scripts.metrics._extract_mdc ( re.Match match,
Iterable[str] context_keys )
protected

Definition at line 276 of file metrics.py.

276def _extract_mdc(match: re.Match, context_keys: Iterable[str]) -> dict[str, Any]:
277 tags: dict[str, Any] = {}
278 mdc_str = match.group("MDC")
279 if mdc_str:
280 mdc_str = mdc_str.replace("'", '"')
281 mdc: dict[str, Any] = yaml.safe_load(io.StringIO(mdc_str))
282 for tag in context_keys:
283 if (tag_val := mdc.get(tag)) is not None:
284 tags[tag] = tag_val
285 return tags

◆ _metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics._metrics_log_to_influx ( TextIO file,
Iterable[str] context_keys,
dict[str, Any] extra_tags,
set[str] drop_tags,
bool fix_row_count,
str mode,
str prefix )
protected
Parse metrics from a single file.

Definition at line 178 of file metrics.py.

186) -> None:
187 """Parse metrics from a single file."""
188 objects_count = -1
189 sources_count = -1
190 forced_sources_count = -1
191
192 match mode:
193 case "pipeline":
194 line_re = _LOG_LINE_RE_PIPELINE
195 case "replication":
196 line_re = _LOG_LINE_RE_REPLICATION
197 case "ap_proto":
198 line_re = _LOG_LINE_RE_AP_PROTO
199 case "json_line":
200 line_re = _LOG_LINE_RE_JSON_LINE
201 case _:
202 raise ValueError(f"Unexpected mode: {mode}")
203
204 for line in file:
205 line = line.strip()
206 if fix_row_count and mode == "pipeline":
207 # Counts come from separate AP messages.
208 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
209 objects_count = int(match.group("count"))
210 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
211 sources_count = int(match.group("count1")) + int(match.group("count2"))
212 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
213 forced_sources_count = int(match.group("count"))
214
215 if match := line_re.match(line):
216 metric_str = match.group("metric")
217 try:
218 metric: dict[str, Any] = json.loads(metric_str)
219 except json.JSONDecodeError:
220 # Ignore parsing erors, sometimes it happens that lines are
221 # scrambled.
222 continue
223 tags = dict(extra_tags)
224
225 name: str = metric["name"]
226 if mode == "replication":
227 if name in _SKIP_METRICS_REPLICATION:
228 continue
229 elif mode == "ap_proto":
230 if name in _SKIP_METRICS_AP_PROTO:
231 continue
232
233 timestamp: float = metric["timestamp"]
234 for tag, tag_val in metric["tags"].items():
235 tags[tag] = tag_val
236 values: dict[str, Any] = metric["values"]
237
238 if fix_row_count and name == "insert_time":
239 if tags["table"].startswith("DiaObject"):
240 values["row_count"] = objects_count
241 elif tags["table"].startswith("DiaSource"):
242 values["row_count"] = sources_count
243 elif tags["table"].startswith("DiaForcedSource"):
244 values["row_count"] = forced_sources_count
245
246 if mode == "pipeline" and context_keys:
247 tags.update(_extract_mdc(match, context_keys))
248
249 for tag in drop_tags:
250 tags.pop(tag, None)
251
252 _print_metrics(prefix + name, tags, values, timestamp)
253
254 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
255 tags = dict(extra_tags)
256 tags["level"] = match.group("level").lower()
257 dt = datetime.fromisoformat(match.group("datetime"))
258 timestamp = dt.timestamp()
259 tags.update(_extract_mdc(match, context_keys))
260 values = {"count": 1}
261
262 message = match.group("message")
263 for message_re, name in _CASSNDRA_MESSAGES_RE:
264 if (message_match := message_re.search(message)) is not None:
265 tags.update(message_match.groupdict())
266 _print_metrics(prefix + name, tags, values, timestamp)
267 break
268
269

◆ _print_metrics()

None lsst.dax.apdb.scripts.metrics._print_metrics ( str name,
dict[str, Any] tags,
dict[str, Any] values,
float timestamp )
protected

Definition at line 270 of file metrics.py.

270def _print_metrics(name: str, tags: dict[str, Any], values: dict[str, Any], timestamp: float) -> None:
271 tags_str = ",".join([name] + [f"{key}={val}" for key, val in tags.items()])
272 values_str = ",".join(f"{key}={val}" for key, val in values.items())
273 print(f"{tags_str} {values_str} {int(timestamp * 1e9)}")
274
275

◆ metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics.metrics_log_to_influx ( Iterable[str] file,
str context_keys,
str extra_tags,
bool fix_row_count,
str mode,
str prefix,
bool no_header,
str header_database )
Extract metrics from log file and dump as InfluxDB data.

Parameters
----------
file : `~collections.abc.Iterable` [`str`]
    Names of the files to parse for metrics.
context_keys : `str`
    Names of keys to extract from message context, comma-separated.
extra_tags : `str`
    Additional tags to add to each record, comma-separated key=value pairs.
fix_row_count : `bool`
    If True then extract records counts from pipeline messages instead of
    metrics. A workaround for broken metrics.
mode : `str`
    Source of the log, one of "ap_proto", "pipeline", "replication",
    "json_line".
prefix : `str`
    Prefix to add to each tag name.
no_header : `bool`
    If False then do not print DML header.
header_database : `str`
    Name of the database for DML header.

Definition at line 115 of file metrics.py.

124) -> None:
125 """Extract metrics from log file and dump as InfluxDB data.
126
127 Parameters
128 ----------
129 file : `~collections.abc.Iterable` [`str`]
130 Names of the files to parse for metrics.
131 context_keys : `str`
132 Names of keys to extract from message context, comma-separated.
133 extra_tags : `str`
134 Additional tags to add to each record, comma-separated key=value pairs.
135 fix_row_count : `bool`
136 If True then extract records counts from pipeline messages instead of
137 metrics. A workaround for broken metrics.
138 mode : `str`
139 Source of the log, one of "ap_proto", "pipeline", "replication",
140 "json_line".
141 prefix : `str`
142 Prefix to add to each tag name.
143 no_header : `bool`
144 If False then do not print DML header.
145 header_database : `str`
146 Name of the database for DML header.
147 """
148 context_names = [name for name in context_keys.split(",") if name]
149 tags: dict[str, Any] = {}
150 drop_tags: set[str] = set()
151 for tag_val in extra_tags.split(","):
152 if tag_val:
153 tag, _, val = tag_val.partition("=")
154 if tag.startswith("-"):
155 drop_tags.add(tag.strip("-"))
156 else:
157 tags[tag] = val
158
159 if not no_header:
160 print(
161 f"""\
162# DML
163
164# CONTEXT-DATABASE: {header_database}
165"""
166 )
167
168 if not file:
169 file = ["-"]
170 for file_name in file:
171 if file_name == "-":
172 _metrics_log_to_influx(sys.stdin, context_names, tags, drop_tags, fix_row_count, mode, prefix)
173 else:
174 with open(file_name) as file_obj:
175 _metrics_log_to_influx(file_obj, context_names, tags, drop_tags, fix_row_count, mode, prefix)
176
177

Variable Documentation

◆ _AP_PIPE_DIAFORCED_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
protected

Definition at line 86 of file metrics.py.

◆ _AP_PIPE_DIAOBJECTS_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
protected

Definition at line 82 of file metrics.py.

◆ _AP_PIPE_DIASOURCES_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIASOURCES_RE
protected
Initial value:
1= re.compile(
2 r"(?P<count1>\d+) updated and \d+ unassociated diaObjects. Creating (?P<count2>\d+) new diaObjects"
3)

Definition at line 83 of file metrics.py.

◆ _CASSNDRA_MESSAGES_RE

tuple lsst.dax.apdb.scripts.metrics._CASSNDRA_MESSAGES_RE
protected
Initial value:
1= (
2 (re.compile(r"^Error preparing query for host (?P<host>\S+):$"), "error_prepare_query"),
3 (re.compile(r"^Control connection failed to connect"), "error_control_connect"),
4 (
5 re.compile(r"^Unexpected failure handling node (?P<host>\S+) being marked up:$"),
6 "error_failure_marking_up",
7 ),
8 (re.compile(r"^Failed to submit task to executor$"), "error_submit_task"),
9 (re.compile(r"^Failed to create connection pool for new host (?P<host>\S+):$"), "warn_create_pool"),
10 (re.compile(r"^Error attempting to reconnect to (?P<host>\S+),"), "warn_reconnect"),
11 (re.compile(r"^Host (?P<host>\S+) has been marked down"), "warn_host_down"),
12)

Definition at line 88 of file metrics.py.

◆ _LOG_LINE_CASSANDRA_RE

lsst.dax.apdb.scripts.metrics._LOG_LINE_CASSANDRA_RE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 71 of file metrics.py.

◆ _LOG_LINE_RE_AP_PROTO

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_AP_PROTO
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 57 of file metrics.py.

◆ _LOG_LINE_RE_JSON_LINE

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$")
protected

Definition at line 68 of file metrics.py.

◆ _LOG_LINE_RE_PIPELINE

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_PIPELINE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 36 of file metrics.py.

◆ _LOG_LINE_RE_REPLICATION

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_REPLICATION
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 47 of file metrics.py.

◆ _SKIP_METRICS_AP_PROTO

dict lsst.dax.apdb.scripts.metrics._SKIP_METRICS_AP_PROTO
protected
Initial value:
1= {
2 "read_metadata_config",
3 "version_check",
4 "insert_build_time",
5}

Definition at line 108 of file metrics.py.

◆ _SKIP_METRICS_REPLICATION

dict lsst.dax.apdb.scripts.metrics._SKIP_METRICS_REPLICATION
protected
Initial value:
1= {
2 "read_metadata_config",
3 "version_check",
4}

Definition at line 103 of file metrics.py.