]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/pyyaml3/composer.py
bundle pyyaml
[netdata.git] / python.d / python_modules / pyyaml3 / composer.py
1
2 __all__ = ['Composer', 'ComposerError']
3
4 from .error import MarkedYAMLError
5 from .events import *
6 from .nodes import *
7
8 class ComposerError(MarkedYAMLError):
9     pass
10
11 class Composer:
12
13     def __init__(self):
14         self.anchors = {}
15
16     def check_node(self):
17         # Drop the STREAM-START event.
18         if self.check_event(StreamStartEvent):
19             self.get_event()
20
21         # If there are more documents available?
22         return not self.check_event(StreamEndEvent)
23
24     def get_node(self):
25         # Get the root node of the next document.
26         if not self.check_event(StreamEndEvent):
27             return self.compose_document()
28
29     def get_single_node(self):
30         # Drop the STREAM-START event.
31         self.get_event()
32
33         # Compose a document if the stream is not empty.
34         document = None
35         if not self.check_event(StreamEndEvent):
36             document = self.compose_document()
37
38         # Ensure that the stream contains no more documents.
39         if not self.check_event(StreamEndEvent):
40             event = self.get_event()
41             raise ComposerError("expected a single document in the stream",
42                     document.start_mark, "but found another document",
43                     event.start_mark)
44
45         # Drop the STREAM-END event.
46         self.get_event()
47
48         return document
49
50     def compose_document(self):
51         # Drop the DOCUMENT-START event.
52         self.get_event()
53
54         # Compose the root node.
55         node = self.compose_node(None, None)
56
57         # Drop the DOCUMENT-END event.
58         self.get_event()
59
60         self.anchors = {}
61         return node
62
63     def compose_node(self, parent, index):
64         if self.check_event(AliasEvent):
65             event = self.get_event()
66             anchor = event.anchor
67             if anchor not in self.anchors:
68                 raise ComposerError(None, None, "found undefined alias %r"
69                         % anchor, event.start_mark)
70             return self.anchors[anchor]
71         event = self.peek_event()
72         anchor = event.anchor
73         if anchor is not None:
74             if anchor in self.anchors:
75                 raise ComposerError("found duplicate anchor %r; first occurence"
76                         % anchor, self.anchors[anchor].start_mark,
77                         "second occurence", event.start_mark)
78         self.descend_resolver(parent, index)
79         if self.check_event(ScalarEvent):
80             node = self.compose_scalar_node(anchor)
81         elif self.check_event(SequenceStartEvent):
82             node = self.compose_sequence_node(anchor)
83         elif self.check_event(MappingStartEvent):
84             node = self.compose_mapping_node(anchor)
85         self.ascend_resolver()
86         return node
87
88     def compose_scalar_node(self, anchor):
89         event = self.get_event()
90         tag = event.tag
91         if tag is None or tag == '!':
92             tag = self.resolve(ScalarNode, event.value, event.implicit)
93         node = ScalarNode(tag, event.value,
94                 event.start_mark, event.end_mark, style=event.style)
95         if anchor is not None:
96             self.anchors[anchor] = node
97         return node
98
99     def compose_sequence_node(self, anchor):
100         start_event = self.get_event()
101         tag = start_event.tag
102         if tag is None or tag == '!':
103             tag = self.resolve(SequenceNode, None, start_event.implicit)
104         node = SequenceNode(tag, [],
105                 start_event.start_mark, None,
106                 flow_style=start_event.flow_style)
107         if anchor is not None:
108             self.anchors[anchor] = node
109         index = 0
110         while not self.check_event(SequenceEndEvent):
111             node.value.append(self.compose_node(node, index))
112             index += 1
113         end_event = self.get_event()
114         node.end_mark = end_event.end_mark
115         return node
116
117     def compose_mapping_node(self, anchor):
118         start_event = self.get_event()
119         tag = start_event.tag
120         if tag is None or tag == '!':
121             tag = self.resolve(MappingNode, None, start_event.implicit)
122         node = MappingNode(tag, [],
123                 start_event.start_mark, None,
124                 flow_style=start_event.flow_style)
125         if anchor is not None:
126             self.anchors[anchor] = node
127         while not self.check_event(MappingEndEvent):
128             #key_event = self.peek_event()
129             item_key = self.compose_node(node, None)
130             #if item_key in node.value:
131             #    raise ComposerError("while composing a mapping", start_event.start_mark,
132             #            "found duplicate key", key_event.start_mark)
133             item_value = self.compose_node(node, item_key)
134             #node.value[item_key] = item_value
135             node.value.append((item_key, item_value))
136         end_event = self.get_event()
137         node.end_mark = end_event.end_mark
138         return node
139