9 from magaox.indi.device
import XDevice, BaseConfig
10 from magaox.camera
import XCam
11 from magaox.constants
import StateCodes
13 from hcipy
import util, make_circular_aperture, Field
15 from skimage
import feature
18 from purepyindi2
import device, properties, constants
19 from purepyindi2.messages
import DefNumber, DefSwitch, DefLight, DefText
27 shmim : str = xconf.field(help=
"Name of the camera device (specifically, the associated shmim, if different)")
28 dark_shmim : str = xconf.field(help=
"Name of the dark frame shmim associated with this camera device")
32 """Automatic coronagraph alignment assistant
34 camera : CameraConfig = xconf.field(help=
"Camera to use")
35 sleep_interval_sec : float = xconf.field(default=0.25, help=
"Sleep interval between loop() calls")
43 config : CorAlignConfig
46 self.log.debug(f
"I was configured! See? {self.config=}")
47 fsm = properties.TextVector(name=
'fsm')
48 fsm.add_element(DefText(name=
'state', _value=StateCodes.INITIALIZED.name))
49 self.add_property(fsm)
51 sv = properties.SwitchVector(
53 rule=constants.SwitchRule.ONE_OF_MANY,
54 perm=constants.PropertyPerm.READ_WRITE,
56 sv.add_element(DefSwitch(name=
"idle", _value=constants.SwitchState.ON))
57 sv.add_element(DefSwitch(name=
"fpm", _value=constants.SwitchState.OFF))
58 sv.add_element(DefSwitch(name=
"psf", _value=constants.SwitchState.OFF))
59 self.add_property(sv, callback=self.
handle_statehandle_state)
61 nv = properties.NumberVector(name=
'measurement')
62 nv.add_element(DefNumber(
63 name=
'counter', label=
'Loop counter', format=
'%i',
64 min=0, max=2**32-1, step=1, _value=0
66 nv.add_element(DefNumber(
67 name=
'x', label=
'X shift', format=
'%3.3f',
68 min=-150.0, max=150.0, step=0.01, _value=0
70 nv.add_element(DefNumber(
71 name=
'y', label=
'Y shift', format=
'%3.3f',
72 min=-150.0, max=150.0, step=0.01, _value=0
76 nv = properties.NumberVector(name=
'ref_x')
77 nv.add_element(DefNumber(
78 name=
'current', label=
'X ref', format=
'%3.3f',
79 min=-150.0, max=150.0, step=0.01, _value=0
81 nv.add_element(DefNumber(
82 name=
'target', label=
'X ref', format=
'%3.3f',
83 min=-150.0, max=150.0, step=0.01, _value=0
85 self.add_property(nv, callback=self.
handle_ref_xhandle_ref_x)
87 nv = properties.NumberVector(name=
'ref_y')
88 nv.add_element(DefNumber(
89 name=
'current', label=
'Y ref', format=
'%3.3f',
90 min=-150.0, max=150.0, step=0.01, _value=0
92 nv.add_element(DefNumber(
93 name=
'target', label=
'Y ref', format=
'%3.3f',
94 min=-150.0, max=150.0, step=0.01, _value=0
96 self.add_property(nv, callback=self.
handle_ref_yhandle_ref_y)
98 nv = properties.NumberVector(name=
'n_avg')
99 nv.add_element(DefNumber(
100 name=
'current', label=
'Number of frames', format=
'%i',
101 min=1, max=150, step=1, _value=1
103 nv.add_element(DefNumber(
104 name=
'target', label=
'Number of frames', format=
'%i',
105 min=1, max=150, step=1, _value=1
107 self.add_property(nv, callback=self.
handle_n_avghandle_n_avg)
109 self.client.get_properties(
'fwfpm')
112 self.log.info(
"Found camera: {:s}".format(self.config.camera.shmim))
113 self.
cameracamera = XCam(self.config.camera.shmim, pixel_size=6.0/21.0, use_hcipy=
True)
120 self.properties[
'fsm'][
'state'] = StateCodes.READY.name
121 self.update_property(self.properties[
'fsm'])
124 if 'target' in new_message
and new_message[
'target'] != existing_property[
'current']:
125 existing_property[
'current'] = new_message[
'target']
126 existing_property[
'target'] = new_message[
'target']
127 self.
_n_avg_n_avg = int(new_message[
'target'])
128 self.update_property(existing_property)
131 target_list = [
'idle',
'psf',
'fpm']
132 for key
in target_list:
133 if existing_property[key] == constants.SwitchState.ON:
136 if current_state
not in new_message:
138 for key
in target_list:
139 existing_property[key] = constants.SwitchState.OFF
140 if key
in new_message:
141 existing_property[key] = new_message[key]
144 self.
_state_state = States.IDLE
145 self.properties[
'fsm'][
'state'] = StateCodes.READY.name
147 self.
_state_state = States.PSF
148 self.properties[
'fsm'][
'state'] = StateCodes.OPERATING.name
150 self.
_state_state = States.CLOSED_LOOP
151 self.properties[
'fsm'][
'state'] = StateCodes.OPERATING.name
153 self.update_property(existing_property)
154 self.update_property(self.properties[
'fsm'])
157 if 'toggle' in new_message
and new_message[
'toggle']
is constants.SwitchState.ON:
158 print(
"switch to closed-loop")
159 existing_property[
'toggle'] = constants.SwitchState.ON
160 self.
_state_state = States.CLOSED_LOOP
161 self.properties[
'fsm'][
'state'] = StateCodes.OPERATING.name
163 if 'toggle' in new_message
and new_message[
'toggle']
is constants.SwitchState.OFF:
164 print(
"switch to IDLE")
165 existing_property[
'toggle'] = constants.SwitchState.OFF
166 self.
_state_state = States.IDLE
167 self.properties[
'fsm'][
'state'] = StateCodes.READY.name
169 self.update_property(existing_property)
170 self.update_property(self.properties[
'fsm'])
173 if 'target' in new_message
and new_message[
'target'] != existing_property[
'current']:
174 existing_property[
'current'] = new_message[
'target']
175 existing_property[
'target'] = new_message[
'target']
176 self.
_ref_x_ref_x = new_message[
'target']
177 self.update_property(existing_property)
180 if 'target' in new_message
and new_message[
'target'] != existing_property[
'current']:
181 existing_property[
'current'] = new_message[
'target']
182 existing_property[
'target'] = new_message[
'target']
183 self.
_ref_y_ref_y = new_message[
'target']
184 self.update_property(existing_property)
187 if self.client[
'fwfpm.filterName.lyotsm'] == constants.SwitchState.ON:
189 elif self.client[
'fwfpm.filterName.lyotlg'] == constants.SwitchState.ON:
190 print(
"Is Lyot large mask")
192 elif self.client[
'fwfpm.filterName.knifemask'] == constants.SwitchState.ON:
193 return knife_edge_dist(image, theta=0, mask_diameter=200, threshold=0.5)
194 elif self.client[
'fwfpm.filterName.spare'] == constants.SwitchState.ON:
195 return np.array([0.0, 0.0])
198 return np.array([0.0, 0.0])
201 return np.array([0.0, 0.0])
204 self.properties[
'state'][
'psf'] = constants.SwitchState.OFF
205 self.properties[
'state'][
'fpm'] = constants.SwitchState.OFF
206 self.properties[
'state'][
'idle'] = constants.SwitchState.ON
207 self.update_property(self.properties[
'state'])
208 self.
_state_state = States.IDLE
211 if self.
_state_state == States.CLOSED_LOOP:
218 self.properties[
'measurement'][
'x'] = center_error[0]
219 self.properties[
'measurement'][
'y'] = center_error[1]
220 self.properties[
'measurement'][
'counter'] = self.
_loop_counter_loop_counter
221 self.update_property(self.properties[
'measurement'])
223 elif self.
_state_state == States.PSF:
228 self.properties[
'ref_x'][
'current'] = center_reference[0]
229 self.properties[
'ref_x'][
'target'] = center_reference[0]
230 self.update_property(self.properties[
'ref_x'])
232 self.properties[
'ref_y'][
'current'] = center_reference[1]
233 self.properties[
'ref_y'][
'target'] = center_reference[1]
234 self.update_property(self.properties[
'ref_y'])
def measure_fpm_mask_shift(self, image)
def handle_state(self, existing_property, new_message)
def handle_closed_loop(self, existing_property, new_message)
def transition_to_idle(self)
def handle_ref_y(self, existing_property, new_message)
def handle_ref_x(self, existing_property, new_message)
def measure_psf_position(self, image)
def handle_n_avg(self, existing_property, new_message)
def measure_center_position(image, threshold=1, mask_diameter=20)
def knife_edge_dist(image, theta=-0.47, mask_diameter=200, threshold=0.5)