LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
monitor.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["MonAgent", "MonService", "LoggingMonHandler"]
25
26import contextlib
27import json
28import logging
29import os
30import time
31import warnings
32from abc import ABC, abstractmethod
33from collections.abc import Iterable, Iterator, Mapping
34from typing import TYPE_CHECKING, Any
35
36from lsst.utils.classes import Singleton
37
38if TYPE_CHECKING:
39 from contextlib import AbstractContextManager
40
41_TagsType = Mapping[str, str | int]
42
43
44_CONFIG_ENV = "DAX_APDB_MONITOR_CONFIG"
45"""Name of the envvar specifying service configuration."""
46
47
48class MonHandler(ABC):
49 """Interface for handlers of the monitoring records.
50
51 Handlers are responsible for delivering monitoring records to their final
52 destination, for example log file or time-series database.
53 """
54
55 @abstractmethod
56 def handle(
57 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str
58 ) -> None:
59 """Handle one monitoring record.
60
61 Parameters
62 ----------
63 name : `str`
64 Record name, arbitrary string.
65 timestamp : `str`
66 Time in seconds since UNIX epoch when record originated.
67 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
68 Tags associated with the record, may be empty.
69 values : `~collections.abc.Mapping` [`str`, `Any`]
70 Values associated with the record, usually never empty.
71 agent_name `str`
72 Name of a client agent that produced this record.
73 """
74 raise NotImplementedError()
75
76
78 """Client-side interface for adding monitoring records to the monitoring
79 service.
80
81 Parameters
82 ----------
83 name : `str`
84 Client agent name, this is used for filtering of the records by the
85 service and is also passed to monitoring handler as ``agent_name``.
86 """
87
88 def __init__(self, name: str = ""):
89 self._name = name
91
93 self,
94 name: str,
95 *,
96 values: Mapping[str, Any],
97 tags: Mapping[str, str | int] | None = None,
98 timestamp: float | None = None,
99 ) -> None:
100 """Send one record to monitoring service.
101
102 Parameters
103 ----------
104 name : `str`
105 Record name, arbitrary string.
106 values : `~collections.abc.Mapping` [`str`, `Any`]
107 Values associated with the record, usually never empty.
108 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
109 Tags associated with the record, may be empty.
110 timestamp : `str`
111 Time in seconds since UNIX epoch when record originated.
112 """
113 self._service._add_record(
114 agent_name=self._name,
115 record_name=name,
116 tags=tags,
117 values=values,
118 timestamp=timestamp,
119 )
120
121 def context_tags(self, tags: _TagsType) -> AbstractContextManager[None]:
122 """Context manager that adds a set of tags to all records created
123 inside the context.
124
125 Parameters
126 ----------
127 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
128 Tags associated with the records.
129
130 Notes
131 -----
132 All calls to `add_record` that happen inside the corresponding context
133 will add tags specified in this call. Tags specified in `add_record`
134 will override matching tag names that are passed to this method. On
135 exit from context a previous tag context is restored (which may be
136 empty).
137 """
138 return self._service.context_tags(tags)
139
140
142 """Filter for the names associated with client agents.
143
144 Parameters
145 ----------
146 rule : `str`
147 String specifying filtering rule for a single name, or catch-all rule.
148 The rule consist of the agent name prefixed by minus or optional plus
149 sign. Catch-all rule uses name "any". If the rule starts with minus
150 sign then matching agent will be rejected. Otherwise matching agent
151 is accepted.
152 """
153
154 def __init__(self, rule: str):
155 self._accept = True
156 if rule.startswith("-"):
157 self._accept = False
158 rule = rule[1:]
159 elif rule.startswith("+"):
160 rule = rule[1:]
161 self.agent_name = "" if rule == "any" else rule
162
163 def is_match_all(self) -> bool:
164 """Return `True` if this rule is a catch-all rule.
165
166 Returns
167 -------
168 is_match_all : `bool`
169 `True` if rule name is `-any`, `+any`, or `any`.
170 """
171 return not self.agent_name
172
173 def accept(self, agent_name: str) -> bool | None:
174 """Return filtering decision for specified agent name.
175
176 Parameters
177 ----------
178 agent_name : `str`
179 Name of the client agent that produces monitoring record.
180
181 Returns
182 -------
183 decision : `bool` or `None`
184 `True` if the agent is accepted, `False` if agent is rejected.
185 `None` is returned if this rule does not match agent name and
186 decision should be made by the next rule.
187 """
188 if not self.agent_name or agent_name == self.agent_name:
189 return self._accept
190 return None
191
192
193class MonService(metaclass=Singleton):
194 """Class implementing monitoring service functionality.
195
196 Notes
197 -----
198 This is a singleton class which serves all client agents in an application.
199 It accepts records from agents, filters it based on a set of configured
200 rules and forwards them to one or more configured handlers. By default
201 there are no handlers defined which means that all records are discarded.
202 Default set of filtering rules is empty which accepts all agent names.
203
204 To produce a useful output from this service one has to add at least one
205 handler using `add_handler` method (e.g. `LoggingMonHandler` instance).
206 The `set_filters` methods can be used to specify the set of filtering
207 rules.
208 """
209
210 _handlers: list[MonHandler] = []
211 """List of active handlers."""
212
213 _context_tags: _TagsType | None = None
214 """Current tag context, these tags are added to each new record."""
215
216 _filters: list[MonFilter] = []
217 """Sequence of filters for agent names."""
218
219 _initialized: bool = False
220 """False before initialization."""
221
222 def set_filters(self, rules: Iterable[str]) -> None:
223 """Define a sequence of rules for filtering of the agent names.
224
225 Parameters
226 ----------
227 rules : `~collections.abc.Iterable` [`str`]
228 Ordered collection of rules. Each string specifies filtering rule
229 for a single name, or catch-all rule. The rule consist of the
230 agent name prefixed by minus or optional plus sign. Catch-all rule
231 uses name "any". If the rule starts with minus sign then matching
232 agent will be rejected. Otherwise matching agent is accepted.
233
234 Notes
235 -----
236 The catch-all rule (`-any`, `+any`, or `any`) can be specified in any
237 location in the sequence but it is always applied last. E.g.
238 `["-any", "+agent1"]` behaves the same as `["+agent1", "-any"]`.
239 If the set of rues does not include catch-all rule, filtering behaves
240 as if it is added implicitly as `+any`.
241
242 Filtering code evaluates each rule in order. First rule that matches
243 the agent name wins. Agent names are matched literally, wildcards are
244 not supported and there are no parent/child relations between agent
245 names (e.g `lsst.dax.apdb` and `lsst.dax.apdb.sql` are treated as
246 independent names).
247 """
248 match_all: MonFilter | None = None
250 for rule in rules:
251 mon_filter = MonFilter(rule)
252 if mon_filter.is_match_all():
253 match_all = mon_filter
254 else:
255 self._filters_filters.append(mon_filter)
256 if match_all:
257 self._filters_filters.append(match_all)
258
260 self,
261 *,
262 agent_name: str,
263 record_name: str,
264 values: Mapping[str, Any],
265 tags: Mapping[str, str | int] | None = None,
266 timestamp: float | None = None,
267 ) -> None:
268 """Add one monitoring record, this method is for use by agents only."""
269 if not self._initialized_initialized:
270 try:
271 self._default_init()
273 except Exception as exc:
274 # Complain but continue.
275 message = f"Error in configuration of monitoring service: {exc}"
276 # Stack level does not really matter.
277 warnings.warn(message, stacklevel=3)
278 if self._handlers:
279 accept: bool | None = None
280 # Check every filter, accept if none makes any decision.
281 for filter in self._filters_filters:
282 accept = filter.accept(agent_name)
283 if accept is False:
284 return
285 if accept is True:
286 break
287 if timestamp is None:
288 timestamp = time.time()
289 if tags is None:
290 tags = self._context_tags_context_tags or {}
291 else:
293 all_tags = dict(self._context_tags_context_tags)
294 all_tags.update(tags)
295 tags = all_tags
296 for handler in self._handlers:
297 handler.handle(record_name, timestamp, tags, values, agent_name)
298
299 def _default_init(self) -> None:
300 """Perform default initialization of the service."""
301 if env := os.environ.get(_CONFIG_ENV):
302 # Configuration is specified as colon-separated list of key:value
303 # pairs or simple values. Simple values are treated as filters
304 # (see set_filters for syntax). key-values pairs pairs specify
305 # handlers, for now the only supported handler is logging, it
306 # is specified as "logging:<logger-name>[:<level>]".
307 filters = []
308 handlers: list[MonHandler] = []
309 for item in env.split(","):
310 pieces = item.split(":")
311 if len(pieces) in (2, 3) and pieces[0] == "logging":
312 logger_name = pieces[1]
313 if len(pieces) == 3:
314 level_name = pieces[2]
315 level = logging.getLevelNamesMapping().get(level_name.upper())
316 if level is None:
317 raise ValueError(
318 f"Unknown logging level name {level_name!r} in {_CONFIG_ENV}={env!r}"
319 )
320 else:
321 level = logging.INFO
322 handlers.append(LoggingMonHandler(logger_name, level))
323 elif len(pieces) == 1:
324 filters.extend(pieces)
325 else:
326 raise ValueError(f"Unexpected format of item {item!r} in {_CONFIG_ENV}={env!r}")
327 for handler in handlers:
328 self.add_handler(handler)
329 self.set_filters(filters)
330
331 @property
332 def handlers(self) -> Iterable[MonHandler]:
333 """Set of handlers defined currently."""
334 return self._handlers
335
336 def add_handler(self, handler: MonHandler) -> None:
337 """Add one monitoring handler.
338
339 Parameters
340 ----------
341 handler : `MonHandler`
342 Handler instance.
343 """
344 # Manually adding handler means default initialization should be
345 # skipped.
346 self._initialized_initialized = True
347 if handler not in self._handlers:
348 self._handlers.append(handler)
349
350 def remove_handler(self, handler: MonHandler) -> None:
351 """Remove a monitoring handler.
352
353 Parameters
354 ----------
355 handler : `MonHandler`
356 Handler instance.
357 """
358 if handler in self._handlers:
359 self._handlers.remove(handler)
360
361 def _add_context_tags(self, tags: _TagsType) -> _TagsType | None:
362 """Extend the tag context with new tags, overriding any tags that may
363 already exist in a current context.
364 """
365 old_tags = self._context_tags_context_tags
366 if not self._context_tags_context_tags:
368 else:
369 all_tags = dict(self._context_tags_context_tags)
370 all_tags.update(tags)
371 self._context_tags_context_tags = all_tags
372 return old_tags
373
374 @contextlib.contextmanager
375 def context_tags(self, tags: _TagsType) -> Iterator[None]:
376 """Context manager that adds a set of tags to all records created
377 inside the context.
378
379 Typically clients will be using `MonAgent.context_tags`, which forwards
380 to this method.
381 """
382 old_context = self._add_context_tags(tags)
383 try:
384 yield
385 finally:
386 # Restore old context.
387 self._context_tags_context_tags = old_context
388
389
391 """Implementation of the monitoring handler which dumps records formatted
392 as JSON objects to `logging`.
393
394 Parameters
395 ----------
396 logger_name : `str`
397 Name of the `logging` logger to use for output.
398 log_level : `int`, optional
399 Logging level to use for output, default is `INFO`
400
401 Notes
402 -----
403 The attributes of the formatted JSON object correspond to the parameters
404 of `handle` method, except for `agent_name` which is mapped to `source`.
405 The `tags` and `values` become JSON sub-objects with corresponding keys.
406 """
407
408 def __init__(self, logger_name: str, log_level: int = logging.INFO):
409 self._logger = logging.getLogger(logger_name)
410 self._level = log_level
411
413 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str
414 ) -> None:
415 # Docstring is inherited from base class.
416 record = {
417 "name": name,
418 "timestamp": timestamp,
419 "tags": tags,
420 "values": values,
421 "source": agent_name,
422 }
423 msg = json.dumps(record)
424 self._logger.log(self._level, msg)
None handle(self, str name, float timestamp, _TagsType tags, Mapping[str, Any] values, str agent_name)
Definition monitor.py:414
__init__(self, str logger_name, int log_level=logging.INFO)
Definition monitor.py:408
AbstractContextManager[None] context_tags(self, _TagsType tags)
Definition monitor.py:121
None add_record(self, str name, *Mapping[str, Any] values, Mapping[str, str|int]|None tags=None, float|None timestamp=None)
Definition monitor.py:99
__init__(self, str name="")
Definition monitor.py:88
bool|None accept(self, str agent_name)
Definition monitor.py:173
None handle(self, str name, float timestamp, _TagsType tags, Mapping[str, Any] values, str agent_name)
Definition monitor.py:58
Iterator[None] context_tags(self, _TagsType tags)
Definition monitor.py:375
None remove_handler(self, MonHandler handler)
Definition monitor.py:350
_TagsType|None _add_context_tags(self, _TagsType tags)
Definition monitor.py:361
None add_handler(self, MonHandler handler)
Definition monitor.py:336
None _add_record(self, *str agent_name, str record_name, Mapping[str, Any] values, Mapping[str, str|int]|None tags=None, float|None timestamp=None)
Definition monitor.py:267
Iterable[MonHandler] handlers(self)
Definition monitor.py:332
None set_filters(self, Iterable[str] rules)
Definition monitor.py:222