API
corAlign.py
Go to the documentation of this file.
1 import sys
2 import logging
3 from enum import Enum
4 import time
5 import numpy as np
6 
7 import xconf
8 
9 from magaox.indi.device import XDevice, BaseConfig
10 from magaox.camera import XCam
11 from magaox.constants import StateCodes
12 
13 from hcipy import util, make_circular_aperture, Field
14 
15 from skimage import feature
16 
17 
18 from purepyindi2 import device, properties, constants
19 from purepyindi2.messages import DefNumber, DefSwitch, DefLight, DefText
20 
21 from utils import *
22 
23 @xconf.config
25  """
26  """
27  shmim : str = xconf.field(help="Name of the camera device (specifically, the associated shmim, if different)")
28  dark_shmim : str = xconf.field(help="Name of the dark frame shmim associated with this camera device")
29 
30 @xconf.config
31 class CorAlignConfig(BaseConfig):
32  """Automatic coronagraph alignment assistant
33  """
34  camera : CameraConfig = xconf.field(help="Camera to use")
35  sleep_interval_sec : float = xconf.field(default=0.25, help="Sleep interval between loop() calls")
36 
37 class States(Enum):
38  IDLE = 0
39  CLOSED_LOOP = 1
40  PSF = 2
41 
42 class corAlign(XDevice):
43  config : CorAlignConfig
44 
45  def setup(self):
46  self.log.debug(f"I was configured! See? {self.config=}")
47  fsm = properties.TextVector(name='fsm')
48  fsm.add_element(DefText(name='state', _value=StateCodes.INITIALIZED.name))
49  self.add_property(fsm)
50 
51  sv = properties.SwitchVector(
52  name='state',
53  rule=constants.SwitchRule.ONE_OF_MANY,
54  perm=constants.PropertyPerm.READ_WRITE,
55  )
56  sv.add_element(DefSwitch(name="idle", _value=constants.SwitchState.ON))
57  sv.add_element(DefSwitch(name="fpm", _value=constants.SwitchState.OFF))
58  sv.add_element(DefSwitch(name="psf", _value=constants.SwitchState.OFF))
59  self.add_property(sv, callback=self.handle_statehandle_state)
60 
61  nv = properties.NumberVector(name='measurement')
62  nv.add_element(DefNumber(
63  name='counter', label='Loop counter', format='%i',
64  min=0, max=2**32-1, step=1, _value=0
65  ))
66  nv.add_element(DefNumber(
67  name='x', label='X shift', format='%3.3f',
68  min=-150.0, max=150.0, step=0.01, _value=0
69  ))
70  nv.add_element(DefNumber(
71  name='y', label='Y shift', format='%3.3f',
72  min=-150.0, max=150.0, step=0.01, _value=0
73  ))
74  self.add_property(nv)
75 
76  nv = properties.NumberVector(name='ref_x')
77  nv.add_element(DefNumber(
78  name='current', label='X ref', format='%3.3f',
79  min=-150.0, max=150.0, step=0.01, _value=0
80  ))
81  nv.add_element(DefNumber(
82  name='target', label='X ref', format='%3.3f',
83  min=-150.0, max=150.0, step=0.01, _value=0
84  ))
85  self.add_property(nv, callback=self.handle_ref_xhandle_ref_x)
86 
87  nv = properties.NumberVector(name='ref_y')
88  nv.add_element(DefNumber(
89  name='current', label='Y ref', format='%3.3f',
90  min=-150.0, max=150.0, step=0.01, _value=0
91  ))
92  nv.add_element(DefNumber(
93  name='target', label='Y ref', format='%3.3f',
94  min=-150.0, max=150.0, step=0.01, _value=0
95  ))
96  self.add_property(nv, callback=self.handle_ref_yhandle_ref_y)
97 
98  nv = properties.NumberVector(name='n_avg')
99  nv.add_element(DefNumber(
100  name='current', label='Number of frames', format='%i',
101  min=1, max=150, step=1, _value=1
102  ))
103  nv.add_element(DefNumber(
104  name='target', label='Number of frames', format='%i',
105  min=1, max=150, step=1, _value=1
106  ))
107  self.add_property(nv, callback=self.handle_n_avghandle_n_avg)
108 
109  self.client.get_properties('fwfpm')
110  #self.client.register_callback(func, device_name='fwfpm')
111 
112  self.log.info("Found camera: {:s}".format(self.config.camera.shmim))
113  self.cameracamera = XCam(self.config.camera.shmim, pixel_size=6.0/21.0, use_hcipy=True)
114  self._state_state = States.IDLE
115 
116  self._loop_counter_loop_counter = 0
117  self._ref_x_ref_x = 0
118  self._ref_y_ref_y = 0
119  self._n_avg_n_avg = 1
120  self.properties['fsm']['state'] = StateCodes.READY.name
121  self.update_property(self.properties['fsm'])
122 
123  def handle_n_avg(self, existing_property, new_message):
124  if 'target' in new_message and new_message['target'] != existing_property['current']:
125  existing_property['current'] = new_message['target']
126  existing_property['target'] = new_message['target']
127  self._n_avg_n_avg = int(new_message['target'])
128  self.update_property(existing_property)
129 
130  def handle_state(self, existing_property, new_message):
131  target_list = ['idle', 'psf', 'fpm']
132  for key in target_list:
133  if existing_property[key] == constants.SwitchState.ON:
134  current_state = key
135 
136  if current_state not in new_message:
137 
138  for key in target_list:
139  existing_property[key] = constants.SwitchState.OFF # Turn everything off
140  if key in new_message:
141  existing_property[key] = new_message[key]
142 
143  if key == 'idle':
144  self._state_state = States.IDLE
145  self.properties['fsm']['state'] = StateCodes.READY.name
146  elif key == 'psf':
147  self._state_state = States.PSF
148  self.properties['fsm']['state'] = StateCodes.OPERATING.name
149  elif key == 'fpm':
150  self._state_state = States.CLOSED_LOOP
151  self.properties['fsm']['state'] = StateCodes.OPERATING.name
152 
153  self.update_property(existing_property)
154  self.update_property(self.properties['fsm'])
155 
156  def handle_closed_loop(self, existing_property, new_message):
157  if 'toggle' in new_message and new_message['toggle'] is constants.SwitchState.ON:
158  print("switch to closed-loop")
159  existing_property['toggle'] = constants.SwitchState.ON
160  self._state_state = States.CLOSED_LOOP
161  self.properties['fsm']['state'] = StateCodes.OPERATING.name
162 
163  if 'toggle' in new_message and new_message['toggle'] is constants.SwitchState.OFF:
164  print("switch to IDLE")
165  existing_property['toggle'] = constants.SwitchState.OFF
166  self._state_state = States.IDLE
167  self.properties['fsm']['state'] = StateCodes.READY.name
168 
169  self.update_property(existing_property)
170  self.update_property(self.properties['fsm'])
171 
172  def handle_ref_x(self, existing_property, new_message):
173  if 'target' in new_message and new_message['target'] != existing_property['current']:
174  existing_property['current'] = new_message['target']
175  existing_property['target'] = new_message['target']
176  self._ref_x_ref_x = new_message['target']
177  self.update_property(existing_property)
178 
179  def handle_ref_y(self, existing_property, new_message):
180  if 'target' in new_message and new_message['target'] != existing_property['current']:
181  existing_property['current'] = new_message['target']
182  existing_property['target'] = new_message['target']
183  self._ref_y_ref_y = new_message['target']
184  self.update_property(existing_property)
185 
186  def measure_fpm_mask_shift(self, image):
187  if self.client['fwfpm.filterName.lyotsm'] == constants.SwitchState.ON:
188  return measure_center_position(image)
189  elif self.client['fwfpm.filterName.lyotlg'] == constants.SwitchState.ON:
190  print("Is Lyot large mask")
191  return measure_center_position(image)
192  elif self.client['fwfpm.filterName.knifemask'] == constants.SwitchState.ON:
193  return knife_edge_dist(image, theta=0, mask_diameter=200, threshold=0.5)
194  elif self.client['fwfpm.filterName.spare'] == constants.SwitchState.ON:
195  return np.array([0.0, 0.0]) # Not implemented yet.
196  else:
197  # Do nothing if in an unspecified position.
198  return np.array([0.0, 0.0])
199 
200  def measure_psf_position(self, image):
201  return np.array([0.0, 0.0])
202 
204  self.properties['state']['psf'] = constants.SwitchState.OFF
205  self.properties['state']['fpm'] = constants.SwitchState.OFF
206  self.properties['state']['idle'] = constants.SwitchState.ON
207  self.update_property(self.properties['state'])
208  self._state_state = States.IDLE
209 
210  def loop(self):
211  if self._state_state == States.CLOSED_LOOP:
212  image = self.cameracamera.grab_stack(self._n_avg_n_avg)
213  center_error = self.measure_fpm_mask_shiftmeasure_fpm_mask_shift(image)
214  print(center_error)
215  self._loop_counter_loop_counter += 1
216 
217  # Send new values to indi server.
218  self.properties['measurement']['x'] = center_error[0]
219  self.properties['measurement']['y'] = center_error[1]
220  self.properties['measurement']['counter'] = self._loop_counter_loop_counter
221  self.update_property(self.properties['measurement'])
222 
223  elif self._state_state == States.PSF:
224  image = self.cameracamera.grab_stack(self._n_avg_n_avg)
225  center_reference = self.measure_psf_positionmeasure_psf_position(image)
226 
227  # Send new values to indi server.
228  self.properties['ref_x']['current'] = center_reference[0]
229  self.properties['ref_x']['target'] = center_reference[0]
230  self.update_property(self.properties['ref_x'])
231 
232  self.properties['ref_y']['current'] = center_reference[1]
233  self.properties['ref_y']['target'] = center_reference[1]
234  self.update_property(self.properties['ref_y'])
235 
236  self.transition_to_idletransition_to_idle()
def measure_fpm_mask_shift(self, image)
Definition: corAlign.py:186
def handle_state(self, existing_property, new_message)
Definition: corAlign.py:130
def setup(self)
Definition: corAlign.py:45
def handle_closed_loop(self, existing_property, new_message)
Definition: corAlign.py:156
def transition_to_idle(self)
Definition: corAlign.py:203
def loop(self)
Definition: corAlign.py:210
def handle_ref_y(self, existing_property, new_message)
Definition: corAlign.py:179
def handle_ref_x(self, existing_property, new_message)
Definition: corAlign.py:172
def measure_psf_position(self, image)
Definition: corAlign.py:200
def handle_n_avg(self, existing_property, new_message)
Definition: corAlign.py:123
def measure_center_position(image, threshold=1, mask_diameter=20)
Definition: utils.py:6
def knife_edge_dist(image, theta=-0.47, mask_diameter=200, threshold=0.5)
Definition: utils.py:17