Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+cc8870d3f5,g07dc498a13+5aa0b8792f,g0fba68d861+80045be308,g1409bbee79+5aa0b8792f,g1a7e361dbc+5aa0b8792f,g1fd858c14a+f64bc332a9,g208c678f98+1ae86710ed,g35bb328faa+fcb1d3bbc8,g4d2262a081+47ad8a29a8,g4d39ba7253+9633a327c1,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g60b5630c4e+9633a327c1,g668ecb457e+25d63fd678,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+8b64ca622a,g89139ef638+5aa0b8792f,g89e1512fd8+04325574d3,g8d6b6b353c+9633a327c1,g9125e01d80+fcb1d3bbc8,g989de1cb63+5aa0b8792f,g9f33ca652e+b196626af7,ga9baa6287d+9633a327c1,gaaedd4e678+5aa0b8792f,gabe3b4be73+1e0a283bba,gb1101e3267+71e32094df,gb58c049af0+f03b321e39,gb90eeb9370+2807b1ad02,gcf25f946ba+8b64ca622a,gd315a588df+a39986a76f,gd6cbbdb0b4+c8606af20c,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+4e42d81ab7,ge278dab8ac+932305ba37,ge82c20c137+76d20ab76d,gfe73954cf8+a1301e4c20,w.2025.11
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
lsst.dax.apdb.scripts.metrics Namespace Reference

Functions

None metrics_log_to_influx (Iterable[str] file, str context_keys, str extra_tags, bool fix_row_count, bool replication, str prefix, bool no_header, str header_database)
 
None _metrics_log_to_influx (TextIO file, Iterable[str] context_keys, dict[str, Any] extra_tags, bool fix_row_count, bool replication, str prefix)
 
None _print_metrics (str name, dict[str, Any] tags, dict[str, Any] values, float timestamp)
 
dict[str, Any] _extract_mdc (re.Match match, Iterable[str] context_keys)
 

Variables

 _LOG_LINE_RE_PIPELINE
 
 _LOG_LINE_RE_REPLICATION
 
 _LOG_LINE_CASSANDRA_RE
 
 _AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
 
 _AP_PIPE_DIASOURCES_RE
 
 _AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
 
tuple _CASSNDRA_MESSAGES_RE
 
dict _SKIP_METRICS_REPLICATION
 

Function Documentation

◆ _extract_mdc()

dict[str, Any] lsst.dax.apdb.scripts.metrics._extract_mdc ( re.Match match,
Iterable[str] context_keys )
protected

Definition at line 229 of file metrics.py.

229def _extract_mdc(match: re.Match, context_keys: Iterable[str]) -> dict[str, Any]:
230 tags: dict[str, Any] = {}
231 mdc_str = match.group("MDC")
232 if mdc_str:
233 mdc_str = mdc_str.replace("'", '"')
234 mdc: dict[str, Any] = yaml.safe_load(io.StringIO(mdc_str))
235 for tag in context_keys:
236 if (tag_val := mdc.get(tag)) is not None:
237 tags[tag] = tag_val
238 return tags

◆ _metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics._metrics_log_to_influx ( TextIO file,
Iterable[str] context_keys,
dict[str, Any] extra_tags,
bool fix_row_count,
bool replication,
str prefix )
protected
Parse metrics from a single file.

Definition at line 155 of file metrics.py.

162) -> None:
163 """Parse metrics from a single file."""
164 objects_count = -1
165 sources_count = -1
166 forced_sources_count = -1
167
168 line_re = _LOG_LINE_RE_REPLICATION if replication else _LOG_LINE_RE_PIPELINE
169
170 for line in file:
171 line = line.strip()
172 if fix_row_count and not replication:
173 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
174 objects_count = int(match.group("count"))
175 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
176 sources_count = int(match.group("count1")) + int(match.group("count2"))
177 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
178 forced_sources_count = int(match.group("count"))
179
180 if match := line_re.match(line):
181 metric_str = match.group("metric")
182 metric: dict[str, Any] = json.loads(metric_str)
183 tags = dict(extra_tags)
184
185 name: str = metric["name"]
186 if replication and name in _SKIP_METRICS_REPLICATION:
187 continue
188
189 timestamp: float = metric["timestamp"]
190 for tag, tag_val in metric["tags"].items():
191 tags[tag] = tag_val
192 values: dict[str, Any] = metric["values"]
193
194 if fix_row_count and name == "insert_time":
195 if tags["table"].startswith("DiaObject"):
196 values["row_count"] = objects_count
197 elif tags["table"].startswith("DiaSource"):
198 values["row_count"] = sources_count
199 elif tags["table"].startswith("DiaForcedSource"):
200 values["row_count"] = forced_sources_count
201
202 if not replication and context_keys:
203 tags.update(_extract_mdc(match, context_keys))
204
205 _print_metrics(prefix + name, tags, values, timestamp)
206
207 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
208 tags = dict(extra_tags)
209 tags["level"] = match.group("level").lower()
210 dt = datetime.fromisoformat(match.group("datetime"))
211 timestamp = dt.timestamp()
212 tags.update(_extract_mdc(match, context_keys))
213 values = {"count": 1}
214
215 message = match.group("message")
216 for message_re, name in _CASSNDRA_MESSAGES_RE:
217 if (message_match := message_re.search(message)) is not None:
218 tags.update(message_match.groupdict())
219 _print_metrics(prefix + name, tags, values, timestamp)
220 break
221
222

◆ _print_metrics()

None lsst.dax.apdb.scripts.metrics._print_metrics ( str name,
dict[str, Any] tags,
dict[str, Any] values,
float timestamp )
protected

Definition at line 223 of file metrics.py.

223def _print_metrics(name: str, tags: dict[str, Any], values: dict[str, Any], timestamp: float) -> None:
224 tags_str = ",".join([name] + [f"{key}={val}" for key, val in tags.items()])
225 values_str = ",".join(f"{key}={val}" for key, val in values.items())
226 print(f"{tags_str} {values_str} {int(timestamp * 1e9)}")
227
228

◆ metrics_log_to_influx()

None lsst.dax.apdb.scripts.metrics.metrics_log_to_influx ( Iterable[str] file,
str context_keys,
str extra_tags,
bool fix_row_count,
bool replication,
str prefix,
bool no_header,
str header_database )
Extract metrics from log file and dump as InfluxDB data.

Parameters
----------
file : `~collections.abc.Iterable` [`str`]
    Names of the files to parse for metrics.
context_keys : `str`
    Names of keys to extract from message context, comma-separated.
extra_tags : `str`
    Additional tags to add to each record, comma-separated key=value pairs.
fix_row_count : `bool`
    If True then extract records counts from pipeline messages instead of
    metrics. A workaround for broken metrics.
replication : `bool`
    If True then the log is from replication service, otherwise it is a log
    from AP pipeline.
prefix : `str`
    Prefix to add to each tag name.
no_header : `bool`
    If False then do not print DML header.
header_database : `str`
    Name of the database for DML header.

Definition at line 96 of file metrics.py.

105) -> None:
106 """Extract metrics from log file and dump as InfluxDB data.
107
108 Parameters
109 ----------
110 file : `~collections.abc.Iterable` [`str`]
111 Names of the files to parse for metrics.
112 context_keys : `str`
113 Names of keys to extract from message context, comma-separated.
114 extra_tags : `str`
115 Additional tags to add to each record, comma-separated key=value pairs.
116 fix_row_count : `bool`
117 If True then extract records counts from pipeline messages instead of
118 metrics. A workaround for broken metrics.
119 replication : `bool`
120 If True then the log is from replication service, otherwise it is a log
121 from AP pipeline.
122 prefix : `str`
123 Prefix to add to each tag name.
124 no_header : `bool`
125 If False then do not print DML header.
126 header_database : `str`
127 Name of the database for DML header.
128 """
129 context_names = [name for name in context_keys.split(",") if name]
130 tags: dict[str, Any] = {}
131 for tag_val in extra_tags.split(","):
132 if tag_val:
133 tag, _, val = tag_val.partition("=")
134 tags[tag] = val
135
136 if not no_header:
137 print(
138 f"""\
139# DML
140
141# CONTEXT-DATABASE: {header_database}
142"""
143 )
144
145 if not file:
146 file = ["-"]
147 for file_name in file:
148 if file_name == "-":
149 _metrics_log_to_influx(sys.stdin, context_names, tags, fix_row_count, replication, prefix)
150 else:
151 with open(file_name) as file_obj:
152 _metrics_log_to_influx(file_obj, context_names, tags, fix_row_count, replication, prefix)
153
154

Variable Documentation

◆ _AP_PIPE_DIAFORCED_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
protected

Definition at line 73 of file metrics.py.

◆ _AP_PIPE_DIAOBJECTS_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
protected

Definition at line 69 of file metrics.py.

◆ _AP_PIPE_DIASOURCES_RE

lsst.dax.apdb.scripts.metrics._AP_PIPE_DIASOURCES_RE
protected
Initial value:
1= re.compile(
2 r"(?P<count1>\d+) updated and \d+ unassociated diaObjects. Creating (?P<count2>\d+) new diaObjects"
3)

Definition at line 70 of file metrics.py.

◆ _CASSNDRA_MESSAGES_RE

tuple lsst.dax.apdb.scripts.metrics._CASSNDRA_MESSAGES_RE
protected
Initial value:
1= (
2 (re.compile(r"^Error preparing query for host (?P<host>\S+):$"), "error_prepare_query"),
3 (re.compile(r"^Control connection failed to connect"), "error_control_connect"),
4 (
5 re.compile(r"^Unexpected failure handling node (?P<host>\S+) being marked up:$"),
6 "error_failure_marking_up",
7 ),
8 (re.compile(r"^Failed to submit task to executor$"), "error_submit_task"),
9 (re.compile(r"^Failed to create connection pool for new host (?P<host>\S+):$"), "warn_create_pool"),
10 (re.compile(r"^Error attempting to reconnect to (?P<host>\S+),"), "warn_reconnect"),
11 (re.compile(r"^Host (?P<host>\S+) has been marked down"), "warn_host_down"),
12)

Definition at line 75 of file metrics.py.

◆ _LOG_LINE_CASSANDRA_RE

lsst.dax.apdb.scripts.metrics._LOG_LINE_CASSANDRA_RE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 58 of file metrics.py.

◆ _LOG_LINE_RE_PIPELINE

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_PIPELINE
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 36 of file metrics.py.

◆ _LOG_LINE_RE_REPLICATION

lsst.dax.apdb.scripts.metrics._LOG_LINE_RE_REPLICATION
protected
Initial value:
1= re.compile(
2 ,
3 re.VERBOSE,
4)

Definition at line 47 of file metrics.py.

◆ _SKIP_METRICS_REPLICATION

dict lsst.dax.apdb.scripts.metrics._SKIP_METRICS_REPLICATION
protected
Initial value:
1= {
2 "read_metadata_config",
3 "version_check",
4}

Definition at line 90 of file metrics.py.