API
 
Loading...
Searching...
No Matches
corAlign.py
Go to the documentation of this file.
1import sys
2import logging
3from enum import Enum
4import time
5import numpy as np
6
7import xconf
8
9from magaox.indi.device import XDevice, BaseConfig
10from magaox.camera import XCam
11from magaox.constants import StateCodes
12
13from hcipy import util, make_circular_aperture, Field
14
15from skimage import feature
16
17
18from purepyindi2 import device, properties, constants
19from purepyindi2.messages import DefNumber, DefSwitch, DefLight, DefText
20
21from utils import *
22
23@xconf.config
25 """
26 """
27 shmim : str = xconf.field(help="Name of the camera device (specifically, the associated shmim, if different)")
28 dark_shmim : str = xconf.field(help="Name of the dark frame shmim associated with this camera device")
29
30@xconf.config
31class CorAlignConfig(BaseConfig):
32 """Automatic coronagraph alignment assistant
33 """
34 camera : CameraConfig = xconf.field(help="Camera to use")
35 sleep_interval_sec : float = xconf.field(default=0.25, help="Sleep interval between loop() calls")
36
37class States(Enum):
38 IDLE = 0
39 CLOSED_LOOP = 1
40 PSF = 2
41
42class corAlign(XDevice):
43 config : CorAlignConfig
44
45 def setup(self):
46 self.log.debug(f"I was configured! See? {self.config=}")
47 fsm = properties.TextVector(name='fsm')
48 fsm.add_element(DefText(name='state', _value=StateCodes.INITIALIZED.name))
49 self.add_property(fsm)
50
51 sv = properties.SwitchVector(
52 name='state',
53 rule=constants.SwitchRule.ONE_OF_MANY,
54 perm=constants.PropertyPerm.READ_WRITE,
55 )
56 sv.add_element(DefSwitch(name="idle", _value=constants.SwitchState.ON))
57 sv.add_element(DefSwitch(name="fpm", _value=constants.SwitchState.OFF))
58 sv.add_element(DefSwitch(name="psf", _value=constants.SwitchState.OFF))
59 self.add_property(sv, callback=self.handle_state)
60
61 nv = properties.NumberVector(name='measurement')
62 nv.add_element(DefNumber(
63 name='counter', label='Loop counter', format='%i',
64 min=0, max=2**32-1, step=1, _value=0
65 ))
66 nv.add_element(DefNumber(
67 name='x', label='X shift', format='%3.3f',
68 min=-150.0, max=150.0, step=0.01, _value=0
69 ))
70 nv.add_element(DefNumber(
71 name='y', label='Y shift', format='%3.3f',
72 min=-150.0, max=150.0, step=0.01, _value=0
73 ))
74 self.add_property(nv)
75
76 nv = properties.NumberVector(name='ref_x')
77 nv.add_element(DefNumber(
78 name='current', label='X ref', format='%3.3f',
79 min=-150.0, max=150.0, step=0.01, _value=0
80 ))
81 nv.add_element(DefNumber(
82 name='target', label='X ref', format='%3.3f',
83 min=-150.0, max=150.0, step=0.01, _value=0
84 ))
85 self.add_property(nv, callback=self.handle_ref_x)
86
87 nv = properties.NumberVector(name='ref_y')
88 nv.add_element(DefNumber(
89 name='current', label='Y ref', format='%3.3f',
90 min=-150.0, max=150.0, step=0.01, _value=0
91 ))
92 nv.add_element(DefNumber(
93 name='target', label='Y ref', format='%3.3f',
94 min=-150.0, max=150.0, step=0.01, _value=0
95 ))
96 self.add_property(nv, callback=self.handle_ref_y)
97
98 nv = properties.NumberVector(name='n_avg')
99 nv.add_element(DefNumber(
100 name='current', label='Number of frames', format='%i',
101 min=1, max=150, step=1, _value=1
102 ))
103 nv.add_element(DefNumber(
104 name='target', label='Number of frames', format='%i',
105 min=1, max=150, step=1, _value=1
106 ))
107 self.add_property(nv, callback=self.handle_n_avg)
108
109 self.client.get_properties('fwfpm')
110 #self.client.register_callback(func, device_name='fwfpm')
111
112 self.log.info("Found camera: {:s}".format(self.config.camera.shmim))
113 self.camera = XCam(self.config.camera.shmim, pixel_size=6.0/21.0, use_hcipy=True)
114 self._state = States.IDLE
115
117 self._ref_x = 0
118 self._ref_y = 0
119 self._n_avg = 1
120 self.properties['fsm']['state'] = StateCodes.READY.name
121 self.update_property(self.properties['fsm'])
122
123 def handle_n_avg(self, existing_property, new_message):
124 if 'target' in new_message and new_message['target'] != existing_property['current']:
125 existing_property['current'] = new_message['target']
126 existing_property['target'] = new_message['target']
127 self._n_avg = int(new_message['target'])
128 self.update_property(existing_property)
129
130 def handle_state(self, existing_property, new_message):
131 target_list = ['idle', 'psf', 'fpm']
132 for key in target_list:
133 if existing_property[key] == constants.SwitchState.ON:
134 current_state = key
135
136 if current_state not in new_message:
137
138 for key in target_list:
139 existing_property[key] = constants.SwitchState.OFF # Turn everything off
140 if key in new_message:
141 existing_property[key] = new_message[key]
142
143 if key == 'idle':
144 self._state = States.IDLE
145 self.properties['fsm']['state'] = StateCodes.READY.name
146 elif key == 'psf':
147 self._state = States.PSF
148 self.properties['fsm']['state'] = StateCodes.OPERATING.name
149 elif key == 'fpm':
150 self._state = States.CLOSED_LOOP
151 self.properties['fsm']['state'] = StateCodes.OPERATING.name
152
153 self.update_property(existing_property)
154 self.update_property(self.properties['fsm'])
155
156 def handle_closed_loop(self, existing_property, new_message):
157 if 'toggle' in new_message and new_message['toggle'] is constants.SwitchState.ON:
158 print("switch to closed-loop")
159 existing_property['toggle'] = constants.SwitchState.ON
160 self._state = States.CLOSED_LOOP
161 self.properties['fsm']['state'] = StateCodes.OPERATING.name
162
163 if 'toggle' in new_message and new_message['toggle'] is constants.SwitchState.OFF:
164 print("switch to IDLE")
165 existing_property['toggle'] = constants.SwitchState.OFF
166 self._state = States.IDLE
167 self.properties['fsm']['state'] = StateCodes.READY.name
168
169 self.update_property(existing_property)
170 self.update_property(self.properties['fsm'])
171
172 def handle_ref_x(self, existing_property, new_message):
173 if 'target' in new_message and new_message['target'] != existing_property['current']:
174 existing_property['current'] = new_message['target']
175 existing_property['target'] = new_message['target']
176 self._ref_x = new_message['target']
177 self.update_property(existing_property)
178
179 def handle_ref_y(self, existing_property, new_message):
180 if 'target' in new_message and new_message['target'] != existing_property['current']:
181 existing_property['current'] = new_message['target']
182 existing_property['target'] = new_message['target']
183 self._ref_y = new_message['target']
184 self.update_property(existing_property)
185
186 def measure_fpm_mask_shift(self, image):
187 if self.client['fwfpm.filterName.lyotsm'] == constants.SwitchState.ON:
188 return measure_center_position(image)
189 elif self.client['fwfpm.filterName.lyotlg'] == constants.SwitchState.ON:
190 print("Is Lyot large mask")
191 return measure_center_position(image)
192 elif self.client['fwfpm.filterName.knifemask'] == constants.SwitchState.ON:
193 return knife_edge_dist(image, theta=0, mask_diameter=200, threshold=0.5)
194 elif self.client['fwfpm.filterName.spare'] == constants.SwitchState.ON:
195 return np.array([0.0, 0.0]) # Not implemented yet.
196 else:
197 # Do nothing if in an unspecified position.
198 return np.array([0.0, 0.0])
199
200 def measure_psf_position(self, image):
201 return np.array([0.0, 0.0])
202
204 self.properties['state']['psf'] = constants.SwitchState.OFF
205 self.properties['state']['fpm'] = constants.SwitchState.OFF
206 self.properties['state']['idle'] = constants.SwitchState.ON
207 self.update_property(self.properties['state'])
208 self._state = States.IDLE
209
210 def loop(self):
211 if self._state == States.CLOSED_LOOP:
212 image = self.camera.grab_stack(self._n_avg)
213 center_error = self.measure_fpm_mask_shift(image)
214 print(center_error)
215 self._loop_counter += 1
216
217 # Send new values to indi server.
218 self.properties['measurement']['x'] = center_error[0]
219 self.properties['measurement']['y'] = center_error[1]
220 self.properties['measurement']['counter'] = self._loop_counter
221 self.update_property(self.properties['measurement'])
222
223 elif self._state == States.PSF:
224 image = self.camera.grab_stack(self._n_avg)
225 center_reference = self.measure_psf_position(image)
226
227 # Send new values to indi server.
228 self.properties['ref_x']['current'] = center_reference[0]
229 self.properties['ref_x']['target'] = center_reference[0]
230 self.update_property(self.properties['ref_x'])
231
232 self.properties['ref_y']['current'] = center_reference[1]
233 self.properties['ref_y']['target'] = center_reference[1]
234 self.update_property(self.properties['ref_y'])
235
236 self.transition_to_idle()
measure_psf_position(self, image)
Definition corAlign.py:200
handle_n_avg(self, existing_property, new_message)
Definition corAlign.py:123
measure_fpm_mask_shift(self, image)
Definition corAlign.py:186
handle_closed_loop(self, existing_property, new_message)
Definition corAlign.py:156
CorAlignConfig config
Definition corAlign.py:43
transition_to_idle(self)
Definition corAlign.py:203
handle_ref_x(self, existing_property, new_message)
Definition corAlign.py:172
handle_state(self, existing_property, new_message)
Definition corAlign.py:130
handle_ref_y(self, existing_property, new_message)
Definition corAlign.py:179
measure_center_position(image, threshold=1, mask_diameter=20)
Definition utils.py:6
knife_edge_dist(image, theta=-0.47, mask_diameter=200, threshold=0.5)
Definition utils.py:17