Skip to content

LINCam Python Interface

The LINCamRemote class provides remote control of the LINCam TCSPC camera over gRPC. It communicates with the LINCam Capture desktop application, which must be running and accessible on the network.

Connection

from photonscore.LINCam.LINCamRemote import LINCamRemote

# Connect to LINCam Capture running on the same machine
cam = LINCamRemote()                      # default: localhost:50051

# Or connect to a remote host
cam = LINCamRemote("192.168.1.42:50051")

After construction the object immediately synchronises with the device, so all status properties are available right away.

Quick Start

from photonscore.LINCam.LINCamRemote import LINCamRemote
import matplotlib.pyplot as plt

cam = LINCamRemote()
print(cam)                          # show live count rates and settings

# Point the device at the right hardware unit
cam.device_url = "192.168.1.10:5000"
cam.tdc_slopes = "//"               # rising edge on start and stop
cam.photocathode_on = True
cam.sync()                          # push changes and refresh status

# Acquire a 512×512 spatial image
cam.sync_histograms(xy_bins=512)
plt.imshow(cam.hist_2d)
plt.colorbar()
plt.show()

Settings Diff Pattern

Every settable property stages a change locally. Nothing is sent to the device until you call sync(). After sync() all read-only status properties (count rates, temperatures, …) are refreshed.

cam.time_gate = [10.0, 50.0]   # gate from 10 ns to 50 ns
cam.time_binnig = 50           # 50 ps / channel
cam.sync()                     # commit both changes in one RPC call
print(cam.acquired_cps)        # now reflects gated count rate

Device Handle

LINCam.LINCamRemote.LINCamRemote

Remote control handle for a LINCam TCSPC camera.

Wraps the gRPC LINCamRemote service exposed by the LINCam Capture application. All settable parameters are Python properties that stage changes locally; call :meth:sync to commit and read back device state.

After construction the object immediately calls :meth:sync so that all read-only properties (serial number, count rates, temperatures, …) are populated.

Attributes:

Name Type Description
hist_2d

Last acquired 2-D spatial histogram as a square numpy array. Updated by :meth:sync_histograms.

hist_dt

Last acquired timing histogram (delta-t) as a 1-D numpy array. Updated by :meth:sync_histograms.

Source code in LINCam/LINCamRemote.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LINCamRemote:
  """Remote control handle for a LINCam TCSPC camera.

  Wraps the gRPC ``LINCamRemote`` service exposed by the LINCam Capture
  application.  All settable parameters are Python properties that stage
  changes locally; call :meth:`sync` to commit and read back device state.

  After construction the object immediately calls :meth:`sync` so that
  all read-only properties (serial number, count rates, temperatures, …)
  are populated.

  Attributes:
      hist_2d: Last acquired 2-D spatial histogram as a square numpy array.
          Updated by :meth:`sync_histograms`.
      hist_dt: Last acquired timing histogram (delta-t) as a 1-D numpy array.
          Updated by :meth:`sync_histograms`.
  """

  def __init__(self, target: str | None = None):
    """Connect to the LINCam Capture gRPC server.

    Args:
        target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
            Defaults to ``"localhost:50051"``.
    """
    self.grpc_target = target
    if target is None:
      self.grpc_target = "localhost:50051"
    self.grpc_channel = grpc.insecure_channel(
      self.grpc_target,
      options = [
        ('grpc.max_send_message_length', 1024 * 1024 * 1024),
        ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
      ]
    )
    self.grpc_client = LINCamRemoteStub(self.grpc_channel)
    self.new_settings = LINCamRemoteSettings()
    self.read_settings = LINCamRemoteSettings()
    self.hist_2d = np.array([])
    self.hist_dt = np.array([])
    self.sync()

  def sync(self):
    """Push pending settings to the device and refresh all status fields.

    Sends the accumulated ``new_settings`` diff to the server.  The server
    returns the full current device state which is stored in
    ``read_settings``.  After the call ``new_settings`` is reset so the
    next :meth:`sync` only sends newly staged changes.
    """
    self.read_settings = self.grpc_client.Sync(self.new_settings)
    self.new_settings = LINCamRemoteSettings()

  def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
    """Fetch the current 2-D spatial image and timing histogram from the device.

    The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
    ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
    delta-t histogram).

    The ``xy_bins`` parameter is rounded *down* to the nearest supported
    resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

    Args:
        xy_bins: Requested edge length of the square spatial image in pixels.
            Supported values: 256, 512 (default), 1024, 2048, 4096.
            Pass ``None`` to skip the 2-D image (timing only).
        clear_after: If ``True`` the device resets its internal histogram
            buffers after the data is returned.
    """
    request = HistorgamsRequest()
    request.clear_after = clear_after
    if xy_bins is None:
      request.image_bins = 0
    else:
      if xy_bins <= 256:
        request.image_bins = 5
      elif xy_bins <= 512:
        request.image_bins = 4
      elif xy_bins <= 1024:
        request.image_bins = 3
      elif xy_bins <= 2048:
        request.image_bins = 2
      else:
        request.image_bins = 1
    res = self.grpc_client.GetHistograms(request)
    h2d = np.array(res.image)
    self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
    self.hist_dt = np.array(res.delta_t)

  def __repr__(self) -> str:
    return self.__str__()

  def __str__(self) -> str:
    if not self.device_connected:
      return f"Not connected URL {self.device_url}"
    test_gen = "OFF" if self.test_generator == 0 else f"{self.test_generator} KHz"
    t_gate = "-" if not self.time_gate_on else f"[{self.time_gate_a} .. {self.time_gate_b}]"
    pc = "off"
    if self.photocathode_on:
      pc = "inactive"
    if self.photocathode_on_status:
      pc = "ACTIVE"

    cps_mcp = LINCamRemote.format_cps(self.mcp_cps)
    cps_acquired = LINCamRemote.format_cps(self.acquired_cps)
    cps_stop = LINCamRemote.format_cps(self.stop_cps)

    return "\n".join(
      [
        f"Device:          {self.device_sn} @ {self.device_url} ({self.device_uptime})",
        f"TDC Slopes:      {self.tdc_slopes}",
        f"Photocathode:    {pc}",
        f"Test generator:  {test_gen}",
        f"CFD thr/zero:    {self.stop_threshold}/{self.stop_zero_cross}",
        f"Timing",  #
        f"   binning:      {self.time_binnig} ps/ch",
        f"    offset:      {self.time_offset:.2f}ns",
        f"      gate:      {t_gate} ns",
        f"CPS",  #
        f"       MCP:      {cps_mcp}",
        f"  Acquired:      {cps_acquired}",
        f"      Stop:      {cps_stop}",
      ]
    )

  @staticmethod
  def format_cps(cps_hz):
    if (cps_hz < 1000):
      return f"{cps_hz} Hz"
    elif (cps_hz < 10_000):
      return f"{cps_hz / 1000:.2f} kHz"
    elif (cps_hz < 100_000):
      return f"{cps_hz / 1000:.1f} kHz"
    elif (cps_hz < 1000_000):
      return f"{int(cps_hz / 1000)} kHz"
    elif (cps_hz < 10_000_000):
      return f"{cps_hz / 1000_000:.2f} MHz"
    else:
      return f"{int(cps_hz / 1000_000)} MHz"

  def setup_recording(
    self,
    filename: str,
    filenumber: int = 0,
    seconds: int = 0,
    continious: bool = False,
  ):
    """Stage file recording parameters (call :meth:`sync` to apply).

    After calling this method set :attr:`is_recording` to ``True`` and
    call :meth:`sync` to start saving photon data to disk.

    Args:
        filename: Base path for the output ``.photons`` file.
            The device appends ``filenumber`` and a timestamp.
        filenumber: Numeric suffix appended to the filename (default 0).
        seconds: Chunk duration in seconds.  ``0`` means record until
            :attr:`is_recording` is set to ``False``.
        continious: If ``True`` the device automatically starts a new chunk
            file when the current one reaches ``seconds``.
    """
    s = self.new_settings
    s.file_path = filename
    s.file_number = filenumber
    s.record_chunk = seconds
    s.record_continious = continious

  @property
  def is_recording(self):
    """bool: Whether the device is currently writing photon data to disk."""
    return self.read_settings.is_recording

  @is_recording.setter
  def is_recording(self, value):
    self.new_settings.is_recording = value

  # Device controls {{{
  @property
  def device_url(self):
    """str: IP address and port of the physical LINCam device (e.g. ``"192.168.1.42:5000"``).

    Setting this tells LINCam Capture which hardware unit to connect to.
    Takes effect on the next :meth:`sync`.
    """
    return self.read_settings.device_url

  @device_url.setter
  def device_url(self, value):
    self.new_settings.device_url = value

  @property
  def tdc_start_rise(self):
    """bool: TDC start channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_start_rise

  @tdc_start_rise.setter
  def tdc_start_rise(self, value):
    self.new_settings.tdc_start_rise = value

  @property
  def tdc_stop_rise(self):
    """bool: TDC stop channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_stop_rise

  @tdc_stop_rise.setter
  def tdc_stop_rise(self, value):
    self.new_settings.tdc_stop_rise = value

  @property
  def tdc_slopes(self):
    """str: Compact slope notation for start and stop channels.

    A two-character string where ``"/"`` means rising and ``"\\"`` means
    falling edge.  For example ``"/\\"`` means start-rising, stop-falling.

    Setting this property updates :attr:`tdc_start_rise` and
    :attr:`tdc_stop_rise` simultaneously.
    """
    start = "/" if self.tdc_start_rise else "\\"
    stop = "/" if self.tdc_stop_rise else "\\"
    return start + stop

  @tdc_slopes.setter
  def tdc_slopes(self, value):
    if not isinstance(value, str):
      raise Exception("tdc_slopes shall be string")
    if len(value) > 0:
      if value[0] == "/":
        self.tdc_start_rise = True
      elif value[0] == "\\":
        self.tdc_start_rise = False
    if len(value) > 1:
      if value[1] == "/":
        self.tdc_stop_rise = True
      elif value[1] == "\\":
        self.tdc_stop_rise = False

  @property
  def photocathode_on(self):
    """bool: Commanded photocathode state.

    Setting to ``True`` requests the detector to enable the photocathode
    high voltage.  The actual on/off status is reflected in
    :attr:`photocathode_on_status` after the next :meth:`sync`.

    .. warning::
        Do not expose the photocathode to ambient light without
        appropriate neutral density filters or the protective lens cap.
    """
    return self.read_settings.photocathode_on

  @photocathode_on.setter
  def photocathode_on(self, value):
    self.new_settings.photocathode_on = value

  @property
  def test_generator(self):
    """int: Internal test pulse rate in kHz, or ``0`` if disabled.

    Accepted set values: ``0`` (off), ``10``, ``100``, ``1000`` kHz.
    Useful for verifying the timing chain without an external light source.
    """
    match self.read_settings.test_generator:
      case 1:
        return 10
      case 2:
        return 100
      case 3:
        return 1000
      case _:
        return 0

  @test_generator.setter
  def test_generator(self, value):
    v = 0
    if value < 1:
      v = 0
    if value >= 1:
      v = 1
    if value > 10:
      v = 2
    if value > 100:
      v = 3
    self.new_settings.test_generator = v

  @property
  def stop_threshold(self):
    """int: CFD stop-channel threshold voltage in DAC counts."""
    return self.read_settings.stop_threshold

  @stop_threshold.setter
  def stop_threshold(self, value):
    self.new_settings.stop_threshold = value

  @property
  def stop_zero_cross(self):
    """int: CFD stop-channel zero-crossing voltage in DAC counts."""
    return self.read_settings.stop_zero_cross

  @stop_zero_cross.setter
  def stop_zero_cross(self, value):
    self.new_settings.stop_zero_cross = value

  @property
  def time_offset(self):
    """float: Global timing offset applied to all events, in nanoseconds."""
    return self.read_settings.time_offset

  @time_offset.setter
  def time_offset(self, value):
    self.new_settings.time_offset = value

  @property
  def time_binnig(self):
    """int: TDC bin size in picoseconds per channel."""
    return self.read_settings.time_binnig

  @time_binnig.setter
  def time_binnig(self, value):
    self.new_settings.time_binnig = value

  @property
  def time_gate_on(self):
    """bool: Whether the timing gate is active."""
    return self.read_settings.time_gate_enable

  @time_gate_on.setter
  def time_gate_on(self, value):
    self.new_settings.time_gate_enable = value

  def _ns2gate(self, ns):
    return ns * 1000 / self.time_binnig

  def _gate2ns(self, g):
    return self.time_binnig * g / 1000

  @property
  def time_gate_a(self):
    """float: Timing gate start edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_a)

  @time_gate_a.setter
  def time_gate_a(self, value):
    self.new_settings.time_gate_a = int(self._ns2gate(value))

  @property
  def time_gate_b(self):
    """float: Timing gate stop edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_b)

  @time_gate_b.setter
  def time_gate_b(self, value):
    self.new_settings.time_gate_b = int(self._ns2gate(value))

  @property
  def time_gate(self):
    """list[float]: Timing gate ``[start_ns, stop_ns]``, or disables gate on set failure.

    Assign a two-element sequence ``[a, b]`` to set both edges and enable
    the gate.  Assign anything else (e.g. ``None``) to disable the gate.
    """
    return [self.time_gate_a, self.time_gate_b]

  @time_gate.setter
  def time_gate(self, gate):
    try:
      (a, b) = gate
      self.time_gate_a = a
      self.time_gate_b = b
      self.time_gate_on = True
    except:
      self.time_gate_on = False

  # }}}

  # Read-only status {{{
  @property
  def device_connected(self):
    """bool: ``True`` if LINCam Capture has an active connection to the hardware."""
    return self.read_settings.device_connected

  @property
  def device_sn(self):
    """str: Hardware serial number reported by the device."""
    return self.read_settings.device_sn

  @property
  def device_uptime(self):
    """str: Human-readable uptime of the LINCam Capture application."""
    return self.read_settings.device_uptime

  @property
  def mcp_cps(self):
    """float: Total MCP anode count rate in Hz (all photon events, pre-filter)."""
    return self.read_settings.mcp_cps

  @property
  def stop_cps(self):
    """float: CFD stop-channel count rate in Hz."""
    return self.read_settings.stop_cps

  @property
  def acquired_cps(self):
    """float: Net acquired photon count rate in Hz (after all filters/gates)."""
    return self.read_settings.acquired_cps

  @property
  def fpga_temperature(self):
    """float: FPGA die temperature in °C."""
    return self.read_settings.fpga_temperature

  @property
  def detector_temperature(self):
    """float: Detector housing temperature in °C."""
    return self.read_settings.detector_temperature

  @property
  def photocathode_on_status(self):
    """bool: Actual photocathode high-voltage state as reported by the hardware.

    Unlike :attr:`photocathode_on` (the commanded state), this reflects
    whether the HV is actually present.  The overexposure protection
    circuit can disable it even when :attr:`photocathode_on` is ``True``.
    """
    return self.read_settings.photocathode_on_status

__init__(target=None)

Connect to the LINCam Capture gRPC server.

Parameters:

Name Type Description Default
target str | None

Host and port of the gRPC server, e.g. "192.168.1.10:50051". Defaults to "localhost:50051".

None
Source code in LINCam/LINCamRemote.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, target: str | None = None):
  """Connect to the LINCam Capture gRPC server.

  Args:
      target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
          Defaults to ``"localhost:50051"``.
  """
  self.grpc_target = target
  if target is None:
    self.grpc_target = "localhost:50051"
  self.grpc_channel = grpc.insecure_channel(
    self.grpc_target,
    options = [
      ('grpc.max_send_message_length', 1024 * 1024 * 1024),
      ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
    ]
  )
  self.grpc_client = LINCamRemoteStub(self.grpc_channel)
  self.new_settings = LINCamRemoteSettings()
  self.read_settings = LINCamRemoteSettings()
  self.hist_2d = np.array([])
  self.hist_dt = np.array([])
  self.sync()

sync()

Push pending settings to the device and refresh all status fields.

Sends the accumulated new_settings diff to the server. The server returns the full current device state which is stored in read_settings. After the call new_settings is reset so the next :meth:sync only sends newly staged changes.

Source code in LINCam/LINCamRemote.py
77
78
79
80
81
82
83
84
85
86
def sync(self):
  """Push pending settings to the device and refresh all status fields.

  Sends the accumulated ``new_settings`` diff to the server.  The server
  returns the full current device state which is stored in
  ``read_settings``.  After the call ``new_settings`` is reset so the
  next :meth:`sync` only sends newly staged changes.
  """
  self.read_settings = self.grpc_client.Sync(self.new_settings)
  self.new_settings = LINCamRemoteSettings()

sync_histograms(xy_bins=512, clear_after=False)

Fetch the current 2-D spatial image and timing histogram from the device.

The acquired data is stored in :attr:hist_2d (2-D numpy array, shape [xy_bins, xy_bins]) and :attr:hist_dt (1-D numpy array with the delta-t histogram).

The xy_bins parameter is rounded down to the nearest supported resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

Parameters:

Name Type Description Default
xy_bins int | None

Requested edge length of the square spatial image in pixels. Supported values: 256, 512 (default), 1024, 2048, 4096. Pass None to skip the 2-D image (timing only).

512
clear_after bool

If True the device resets its internal histogram buffers after the data is returned.

False
Source code in LINCam/LINCamRemote.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
  """Fetch the current 2-D spatial image and timing histogram from the device.

  The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
  ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
  delta-t histogram).

  The ``xy_bins`` parameter is rounded *down* to the nearest supported
  resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

  Args:
      xy_bins: Requested edge length of the square spatial image in pixels.
          Supported values: 256, 512 (default), 1024, 2048, 4096.
          Pass ``None`` to skip the 2-D image (timing only).
      clear_after: If ``True`` the device resets its internal histogram
          buffers after the data is returned.
  """
  request = HistorgamsRequest()
  request.clear_after = clear_after
  if xy_bins is None:
    request.image_bins = 0
  else:
    if xy_bins <= 256:
      request.image_bins = 5
    elif xy_bins <= 512:
      request.image_bins = 4
    elif xy_bins <= 1024:
      request.image_bins = 3
    elif xy_bins <= 2048:
      request.image_bins = 2
    else:
      request.image_bins = 1
  res = self.grpc_client.GetHistograms(request)
  h2d = np.array(res.image)
  self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
  self.hist_dt = np.array(res.delta_t)

setup_recording(filename, filenumber=0, seconds=0, continious=False)

Stage file recording parameters (call :meth:sync to apply).

After calling this method set :attr:is_recording to True and call :meth:sync to start saving photon data to disk.

Parameters:

Name Type Description Default
filename str

Base path for the output .photons file. The device appends filenumber and a timestamp.

required
filenumber int

Numeric suffix appended to the filename (default 0).

0
seconds int

Chunk duration in seconds. 0 means record until :attr:is_recording is set to False.

0
continious bool

If True the device automatically starts a new chunk file when the current one reaches seconds.

False
Source code in LINCam/LINCamRemote.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def setup_recording(
  self,
  filename: str,
  filenumber: int = 0,
  seconds: int = 0,
  continious: bool = False,
):
  """Stage file recording parameters (call :meth:`sync` to apply).

  After calling this method set :attr:`is_recording` to ``True`` and
  call :meth:`sync` to start saving photon data to disk.

  Args:
      filename: Base path for the output ``.photons`` file.
          The device appends ``filenumber`` and a timestamp.
      filenumber: Numeric suffix appended to the filename (default 0).
      seconds: Chunk duration in seconds.  ``0`` means record until
          :attr:`is_recording` is set to ``False``.
      continious: If ``True`` the device automatically starts a new chunk
          file when the current one reaches ``seconds``.
  """
  s = self.new_settings
  s.file_path = filename
  s.file_number = filenumber
  s.record_chunk = seconds
  s.record_continious = continious

Device Controls

Properties that configure device behaviour. Stage changes then call sync().

Property Type Description
device_url str IP:port of the physical LINCam hardware
photocathode_on bool Enable / disable photocathode HV
tdc_slopes str Edge polarity: "//", "/\\", "\\/", "\\\\"
tdc_start_rise bool Start channel — rising edge
tdc_stop_rise bool Stop channel — rising edge
stop_threshold int CFD threshold voltage (DAC counts)
stop_zero_cross int CFD zero-crossing voltage (DAC counts)
test_generator int Internal test pulses: 0, 10, 100, 1000 kHz

Remote control handle for a LINCam TCSPC camera.

Wraps the gRPC LINCamRemote service exposed by the LINCam Capture application. All settable parameters are Python properties that stage changes locally; call :meth:sync to commit and read back device state.

After construction the object immediately calls :meth:sync so that all read-only properties (serial number, count rates, temperatures, …) are populated.

Attributes:

Name Type Description
hist_2d

Last acquired 2-D spatial histogram as a square numpy array. Updated by :meth:sync_histograms.

hist_dt

Last acquired timing histogram (delta-t) as a 1-D numpy array. Updated by :meth:sync_histograms.

Source code in LINCam/LINCamRemote.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LINCamRemote:
  """Remote control handle for a LINCam TCSPC camera.

  Wraps the gRPC ``LINCamRemote`` service exposed by the LINCam Capture
  application.  All settable parameters are Python properties that stage
  changes locally; call :meth:`sync` to commit and read back device state.

  After construction the object immediately calls :meth:`sync` so that
  all read-only properties (serial number, count rates, temperatures, …)
  are populated.

  Attributes:
      hist_2d: Last acquired 2-D spatial histogram as a square numpy array.
          Updated by :meth:`sync_histograms`.
      hist_dt: Last acquired timing histogram (delta-t) as a 1-D numpy array.
          Updated by :meth:`sync_histograms`.
  """

  def __init__(self, target: str | None = None):
    """Connect to the LINCam Capture gRPC server.

    Args:
        target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
            Defaults to ``"localhost:50051"``.
    """
    self.grpc_target = target
    if target is None:
      self.grpc_target = "localhost:50051"
    self.grpc_channel = grpc.insecure_channel(
      self.grpc_target,
      options = [
        ('grpc.max_send_message_length', 1024 * 1024 * 1024),
        ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
      ]
    )
    self.grpc_client = LINCamRemoteStub(self.grpc_channel)
    self.new_settings = LINCamRemoteSettings()
    self.read_settings = LINCamRemoteSettings()
    self.hist_2d = np.array([])
    self.hist_dt = np.array([])
    self.sync()

  def sync(self):
    """Push pending settings to the device and refresh all status fields.

    Sends the accumulated ``new_settings`` diff to the server.  The server
    returns the full current device state which is stored in
    ``read_settings``.  After the call ``new_settings`` is reset so the
    next :meth:`sync` only sends newly staged changes.
    """
    self.read_settings = self.grpc_client.Sync(self.new_settings)
    self.new_settings = LINCamRemoteSettings()

  def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
    """Fetch the current 2-D spatial image and timing histogram from the device.

    The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
    ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
    delta-t histogram).

    The ``xy_bins`` parameter is rounded *down* to the nearest supported
    resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

    Args:
        xy_bins: Requested edge length of the square spatial image in pixels.
            Supported values: 256, 512 (default), 1024, 2048, 4096.
            Pass ``None`` to skip the 2-D image (timing only).
        clear_after: If ``True`` the device resets its internal histogram
            buffers after the data is returned.
    """
    request = HistorgamsRequest()
    request.clear_after = clear_after
    if xy_bins is None:
      request.image_bins = 0
    else:
      if xy_bins <= 256:
        request.image_bins = 5
      elif xy_bins <= 512:
        request.image_bins = 4
      elif xy_bins <= 1024:
        request.image_bins = 3
      elif xy_bins <= 2048:
        request.image_bins = 2
      else:
        request.image_bins = 1
    res = self.grpc_client.GetHistograms(request)
    h2d = np.array(res.image)
    self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
    self.hist_dt = np.array(res.delta_t)

  def __repr__(self) -> str:
    return self.__str__()

  def __str__(self) -> str:
    if not self.device_connected:
      return f"Not connected URL {self.device_url}"
    test_gen = "OFF" if self.test_generator == 0 else f"{self.test_generator} KHz"
    t_gate = "-" if not self.time_gate_on else f"[{self.time_gate_a} .. {self.time_gate_b}]"
    pc = "off"
    if self.photocathode_on:
      pc = "inactive"
    if self.photocathode_on_status:
      pc = "ACTIVE"

    cps_mcp = LINCamRemote.format_cps(self.mcp_cps)
    cps_acquired = LINCamRemote.format_cps(self.acquired_cps)
    cps_stop = LINCamRemote.format_cps(self.stop_cps)

    return "\n".join(
      [
        f"Device:          {self.device_sn} @ {self.device_url} ({self.device_uptime})",
        f"TDC Slopes:      {self.tdc_slopes}",
        f"Photocathode:    {pc}",
        f"Test generator:  {test_gen}",
        f"CFD thr/zero:    {self.stop_threshold}/{self.stop_zero_cross}",
        f"Timing",  #
        f"   binning:      {self.time_binnig} ps/ch",
        f"    offset:      {self.time_offset:.2f}ns",
        f"      gate:      {t_gate} ns",
        f"CPS",  #
        f"       MCP:      {cps_mcp}",
        f"  Acquired:      {cps_acquired}",
        f"      Stop:      {cps_stop}",
      ]
    )

  @staticmethod
  def format_cps(cps_hz):
    if (cps_hz < 1000):
      return f"{cps_hz} Hz"
    elif (cps_hz < 10_000):
      return f"{cps_hz / 1000:.2f} kHz"
    elif (cps_hz < 100_000):
      return f"{cps_hz / 1000:.1f} kHz"
    elif (cps_hz < 1000_000):
      return f"{int(cps_hz / 1000)} kHz"
    elif (cps_hz < 10_000_000):
      return f"{cps_hz / 1000_000:.2f} MHz"
    else:
      return f"{int(cps_hz / 1000_000)} MHz"

  def setup_recording(
    self,
    filename: str,
    filenumber: int = 0,
    seconds: int = 0,
    continious: bool = False,
  ):
    """Stage file recording parameters (call :meth:`sync` to apply).

    After calling this method set :attr:`is_recording` to ``True`` and
    call :meth:`sync` to start saving photon data to disk.

    Args:
        filename: Base path for the output ``.photons`` file.
            The device appends ``filenumber`` and a timestamp.
        filenumber: Numeric suffix appended to the filename (default 0).
        seconds: Chunk duration in seconds.  ``0`` means record until
            :attr:`is_recording` is set to ``False``.
        continious: If ``True`` the device automatically starts a new chunk
            file when the current one reaches ``seconds``.
    """
    s = self.new_settings
    s.file_path = filename
    s.file_number = filenumber
    s.record_chunk = seconds
    s.record_continious = continious

  @property
  def is_recording(self):
    """bool: Whether the device is currently writing photon data to disk."""
    return self.read_settings.is_recording

  @is_recording.setter
  def is_recording(self, value):
    self.new_settings.is_recording = value

  # Device controls {{{
  @property
  def device_url(self):
    """str: IP address and port of the physical LINCam device (e.g. ``"192.168.1.42:5000"``).

    Setting this tells LINCam Capture which hardware unit to connect to.
    Takes effect on the next :meth:`sync`.
    """
    return self.read_settings.device_url

  @device_url.setter
  def device_url(self, value):
    self.new_settings.device_url = value

  @property
  def tdc_start_rise(self):
    """bool: TDC start channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_start_rise

  @tdc_start_rise.setter
  def tdc_start_rise(self, value):
    self.new_settings.tdc_start_rise = value

  @property
  def tdc_stop_rise(self):
    """bool: TDC stop channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_stop_rise

  @tdc_stop_rise.setter
  def tdc_stop_rise(self, value):
    self.new_settings.tdc_stop_rise = value

  @property
  def tdc_slopes(self):
    """str: Compact slope notation for start and stop channels.

    A two-character string where ``"/"`` means rising and ``"\\"`` means
    falling edge.  For example ``"/\\"`` means start-rising, stop-falling.

    Setting this property updates :attr:`tdc_start_rise` and
    :attr:`tdc_stop_rise` simultaneously.
    """
    start = "/" if self.tdc_start_rise else "\\"
    stop = "/" if self.tdc_stop_rise else "\\"
    return start + stop

  @tdc_slopes.setter
  def tdc_slopes(self, value):
    if not isinstance(value, str):
      raise Exception("tdc_slopes shall be string")
    if len(value) > 0:
      if value[0] == "/":
        self.tdc_start_rise = True
      elif value[0] == "\\":
        self.tdc_start_rise = False
    if len(value) > 1:
      if value[1] == "/":
        self.tdc_stop_rise = True
      elif value[1] == "\\":
        self.tdc_stop_rise = False

  @property
  def photocathode_on(self):
    """bool: Commanded photocathode state.

    Setting to ``True`` requests the detector to enable the photocathode
    high voltage.  The actual on/off status is reflected in
    :attr:`photocathode_on_status` after the next :meth:`sync`.

    .. warning::
        Do not expose the photocathode to ambient light without
        appropriate neutral density filters or the protective lens cap.
    """
    return self.read_settings.photocathode_on

  @photocathode_on.setter
  def photocathode_on(self, value):
    self.new_settings.photocathode_on = value

  @property
  def test_generator(self):
    """int: Internal test pulse rate in kHz, or ``0`` if disabled.

    Accepted set values: ``0`` (off), ``10``, ``100``, ``1000`` kHz.
    Useful for verifying the timing chain without an external light source.
    """
    match self.read_settings.test_generator:
      case 1:
        return 10
      case 2:
        return 100
      case 3:
        return 1000
      case _:
        return 0

  @test_generator.setter
  def test_generator(self, value):
    v = 0
    if value < 1:
      v = 0
    if value >= 1:
      v = 1
    if value > 10:
      v = 2
    if value > 100:
      v = 3
    self.new_settings.test_generator = v

  @property
  def stop_threshold(self):
    """int: CFD stop-channel threshold voltage in DAC counts."""
    return self.read_settings.stop_threshold

  @stop_threshold.setter
  def stop_threshold(self, value):
    self.new_settings.stop_threshold = value

  @property
  def stop_zero_cross(self):
    """int: CFD stop-channel zero-crossing voltage in DAC counts."""
    return self.read_settings.stop_zero_cross

  @stop_zero_cross.setter
  def stop_zero_cross(self, value):
    self.new_settings.stop_zero_cross = value

  @property
  def time_offset(self):
    """float: Global timing offset applied to all events, in nanoseconds."""
    return self.read_settings.time_offset

  @time_offset.setter
  def time_offset(self, value):
    self.new_settings.time_offset = value

  @property
  def time_binnig(self):
    """int: TDC bin size in picoseconds per channel."""
    return self.read_settings.time_binnig

  @time_binnig.setter
  def time_binnig(self, value):
    self.new_settings.time_binnig = value

  @property
  def time_gate_on(self):
    """bool: Whether the timing gate is active."""
    return self.read_settings.time_gate_enable

  @time_gate_on.setter
  def time_gate_on(self, value):
    self.new_settings.time_gate_enable = value

  def _ns2gate(self, ns):
    return ns * 1000 / self.time_binnig

  def _gate2ns(self, g):
    return self.time_binnig * g / 1000

  @property
  def time_gate_a(self):
    """float: Timing gate start edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_a)

  @time_gate_a.setter
  def time_gate_a(self, value):
    self.new_settings.time_gate_a = int(self._ns2gate(value))

  @property
  def time_gate_b(self):
    """float: Timing gate stop edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_b)

  @time_gate_b.setter
  def time_gate_b(self, value):
    self.new_settings.time_gate_b = int(self._ns2gate(value))

  @property
  def time_gate(self):
    """list[float]: Timing gate ``[start_ns, stop_ns]``, or disables gate on set failure.

    Assign a two-element sequence ``[a, b]`` to set both edges and enable
    the gate.  Assign anything else (e.g. ``None``) to disable the gate.
    """
    return [self.time_gate_a, self.time_gate_b]

  @time_gate.setter
  def time_gate(self, gate):
    try:
      (a, b) = gate
      self.time_gate_a = a
      self.time_gate_b = b
      self.time_gate_on = True
    except:
      self.time_gate_on = False

  # }}}

  # Read-only status {{{
  @property
  def device_connected(self):
    """bool: ``True`` if LINCam Capture has an active connection to the hardware."""
    return self.read_settings.device_connected

  @property
  def device_sn(self):
    """str: Hardware serial number reported by the device."""
    return self.read_settings.device_sn

  @property
  def device_uptime(self):
    """str: Human-readable uptime of the LINCam Capture application."""
    return self.read_settings.device_uptime

  @property
  def mcp_cps(self):
    """float: Total MCP anode count rate in Hz (all photon events, pre-filter)."""
    return self.read_settings.mcp_cps

  @property
  def stop_cps(self):
    """float: CFD stop-channel count rate in Hz."""
    return self.read_settings.stop_cps

  @property
  def acquired_cps(self):
    """float: Net acquired photon count rate in Hz (after all filters/gates)."""
    return self.read_settings.acquired_cps

  @property
  def fpga_temperature(self):
    """float: FPGA die temperature in °C."""
    return self.read_settings.fpga_temperature

  @property
  def detector_temperature(self):
    """float: Detector housing temperature in °C."""
    return self.read_settings.detector_temperature

  @property
  def photocathode_on_status(self):
    """bool: Actual photocathode high-voltage state as reported by the hardware.

    Unlike :attr:`photocathode_on` (the commanded state), this reflects
    whether the HV is actually present.  The overexposure protection
    circuit can disable it even when :attr:`photocathode_on` is ``True``.
    """
    return self.read_settings.photocathode_on_status

device_url property writable

str: IP address and port of the physical LINCam device (e.g. "192.168.1.42:5000").

Setting this tells LINCam Capture which hardware unit to connect to. Takes effect on the next :meth:sync.

photocathode_on property writable

bool: Commanded photocathode state.

Setting to True requests the detector to enable the photocathode high voltage. The actual on/off status is reflected in :attr:photocathode_on_status after the next :meth:sync.

.. warning:: Do not expose the photocathode to ambient light without appropriate neutral density filters or the protective lens cap.

tdc_slopes property writable

str: Compact slope notation for start and stop channels.

A two-character string where "/" means rising and "\" means falling edge. For example "/\" means start-rising, stop-falling.

Setting this property updates :attr:tdc_start_rise and :attr:tdc_stop_rise simultaneously.

tdc_start_rise property writable

bool: TDC start channel edge polarity — True = rising, False = falling.

tdc_stop_rise property writable

bool: TDC stop channel edge polarity — True = rising, False = falling.

stop_threshold property writable

int: CFD stop-channel threshold voltage in DAC counts.

stop_zero_cross property writable

int: CFD stop-channel zero-crossing voltage in DAC counts.

test_generator property writable

int: Internal test pulse rate in kHz, or 0 if disabled.

Accepted set values: 0 (off), 10, 100, 1000 kHz. Useful for verifying the timing chain without an external light source.

Timing

Property Type Description
time_offset float Global timing offset in ns
time_binnig int Bin size in ps/channel
time_gate list[float] Gate [start_ns, stop_ns]; set None to disable
time_gate_on bool Gate enable/disable
time_gate_a float Gate start in ns
time_gate_b float Gate stop in ns

Remote control handle for a LINCam TCSPC camera.

Wraps the gRPC LINCamRemote service exposed by the LINCam Capture application. All settable parameters are Python properties that stage changes locally; call :meth:sync to commit and read back device state.

After construction the object immediately calls :meth:sync so that all read-only properties (serial number, count rates, temperatures, …) are populated.

Attributes:

Name Type Description
hist_2d

Last acquired 2-D spatial histogram as a square numpy array. Updated by :meth:sync_histograms.

hist_dt

Last acquired timing histogram (delta-t) as a 1-D numpy array. Updated by :meth:sync_histograms.

Source code in LINCam/LINCamRemote.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LINCamRemote:
  """Remote control handle for a LINCam TCSPC camera.

  Wraps the gRPC ``LINCamRemote`` service exposed by the LINCam Capture
  application.  All settable parameters are Python properties that stage
  changes locally; call :meth:`sync` to commit and read back device state.

  After construction the object immediately calls :meth:`sync` so that
  all read-only properties (serial number, count rates, temperatures, …)
  are populated.

  Attributes:
      hist_2d: Last acquired 2-D spatial histogram as a square numpy array.
          Updated by :meth:`sync_histograms`.
      hist_dt: Last acquired timing histogram (delta-t) as a 1-D numpy array.
          Updated by :meth:`sync_histograms`.
  """

  def __init__(self, target: str | None = None):
    """Connect to the LINCam Capture gRPC server.

    Args:
        target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
            Defaults to ``"localhost:50051"``.
    """
    self.grpc_target = target
    if target is None:
      self.grpc_target = "localhost:50051"
    self.grpc_channel = grpc.insecure_channel(
      self.grpc_target,
      options = [
        ('grpc.max_send_message_length', 1024 * 1024 * 1024),
        ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
      ]
    )
    self.grpc_client = LINCamRemoteStub(self.grpc_channel)
    self.new_settings = LINCamRemoteSettings()
    self.read_settings = LINCamRemoteSettings()
    self.hist_2d = np.array([])
    self.hist_dt = np.array([])
    self.sync()

  def sync(self):
    """Push pending settings to the device and refresh all status fields.

    Sends the accumulated ``new_settings`` diff to the server.  The server
    returns the full current device state which is stored in
    ``read_settings``.  After the call ``new_settings`` is reset so the
    next :meth:`sync` only sends newly staged changes.
    """
    self.read_settings = self.grpc_client.Sync(self.new_settings)
    self.new_settings = LINCamRemoteSettings()

  def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
    """Fetch the current 2-D spatial image and timing histogram from the device.

    The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
    ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
    delta-t histogram).

    The ``xy_bins`` parameter is rounded *down* to the nearest supported
    resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

    Args:
        xy_bins: Requested edge length of the square spatial image in pixels.
            Supported values: 256, 512 (default), 1024, 2048, 4096.
            Pass ``None`` to skip the 2-D image (timing only).
        clear_after: If ``True`` the device resets its internal histogram
            buffers after the data is returned.
    """
    request = HistorgamsRequest()
    request.clear_after = clear_after
    if xy_bins is None:
      request.image_bins = 0
    else:
      if xy_bins <= 256:
        request.image_bins = 5
      elif xy_bins <= 512:
        request.image_bins = 4
      elif xy_bins <= 1024:
        request.image_bins = 3
      elif xy_bins <= 2048:
        request.image_bins = 2
      else:
        request.image_bins = 1
    res = self.grpc_client.GetHistograms(request)
    h2d = np.array(res.image)
    self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
    self.hist_dt = np.array(res.delta_t)

  def __repr__(self) -> str:
    return self.__str__()

  def __str__(self) -> str:
    if not self.device_connected:
      return f"Not connected URL {self.device_url}"
    test_gen = "OFF" if self.test_generator == 0 else f"{self.test_generator} KHz"
    t_gate = "-" if not self.time_gate_on else f"[{self.time_gate_a} .. {self.time_gate_b}]"
    pc = "off"
    if self.photocathode_on:
      pc = "inactive"
    if self.photocathode_on_status:
      pc = "ACTIVE"

    cps_mcp = LINCamRemote.format_cps(self.mcp_cps)
    cps_acquired = LINCamRemote.format_cps(self.acquired_cps)
    cps_stop = LINCamRemote.format_cps(self.stop_cps)

    return "\n".join(
      [
        f"Device:          {self.device_sn} @ {self.device_url} ({self.device_uptime})",
        f"TDC Slopes:      {self.tdc_slopes}",
        f"Photocathode:    {pc}",
        f"Test generator:  {test_gen}",
        f"CFD thr/zero:    {self.stop_threshold}/{self.stop_zero_cross}",
        f"Timing",  #
        f"   binning:      {self.time_binnig} ps/ch",
        f"    offset:      {self.time_offset:.2f}ns",
        f"      gate:      {t_gate} ns",
        f"CPS",  #
        f"       MCP:      {cps_mcp}",
        f"  Acquired:      {cps_acquired}",
        f"      Stop:      {cps_stop}",
      ]
    )

  @staticmethod
  def format_cps(cps_hz):
    if (cps_hz < 1000):
      return f"{cps_hz} Hz"
    elif (cps_hz < 10_000):
      return f"{cps_hz / 1000:.2f} kHz"
    elif (cps_hz < 100_000):
      return f"{cps_hz / 1000:.1f} kHz"
    elif (cps_hz < 1000_000):
      return f"{int(cps_hz / 1000)} kHz"
    elif (cps_hz < 10_000_000):
      return f"{cps_hz / 1000_000:.2f} MHz"
    else:
      return f"{int(cps_hz / 1000_000)} MHz"

  def setup_recording(
    self,
    filename: str,
    filenumber: int = 0,
    seconds: int = 0,
    continious: bool = False,
  ):
    """Stage file recording parameters (call :meth:`sync` to apply).

    After calling this method set :attr:`is_recording` to ``True`` and
    call :meth:`sync` to start saving photon data to disk.

    Args:
        filename: Base path for the output ``.photons`` file.
            The device appends ``filenumber`` and a timestamp.
        filenumber: Numeric suffix appended to the filename (default 0).
        seconds: Chunk duration in seconds.  ``0`` means record until
            :attr:`is_recording` is set to ``False``.
        continious: If ``True`` the device automatically starts a new chunk
            file when the current one reaches ``seconds``.
    """
    s = self.new_settings
    s.file_path = filename
    s.file_number = filenumber
    s.record_chunk = seconds
    s.record_continious = continious

  @property
  def is_recording(self):
    """bool: Whether the device is currently writing photon data to disk."""
    return self.read_settings.is_recording

  @is_recording.setter
  def is_recording(self, value):
    self.new_settings.is_recording = value

  # Device controls {{{
  @property
  def device_url(self):
    """str: IP address and port of the physical LINCam device (e.g. ``"192.168.1.42:5000"``).

    Setting this tells LINCam Capture which hardware unit to connect to.
    Takes effect on the next :meth:`sync`.
    """
    return self.read_settings.device_url

  @device_url.setter
  def device_url(self, value):
    self.new_settings.device_url = value

  @property
  def tdc_start_rise(self):
    """bool: TDC start channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_start_rise

  @tdc_start_rise.setter
  def tdc_start_rise(self, value):
    self.new_settings.tdc_start_rise = value

  @property
  def tdc_stop_rise(self):
    """bool: TDC stop channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_stop_rise

  @tdc_stop_rise.setter
  def tdc_stop_rise(self, value):
    self.new_settings.tdc_stop_rise = value

  @property
  def tdc_slopes(self):
    """str: Compact slope notation for start and stop channels.

    A two-character string where ``"/"`` means rising and ``"\\"`` means
    falling edge.  For example ``"/\\"`` means start-rising, stop-falling.

    Setting this property updates :attr:`tdc_start_rise` and
    :attr:`tdc_stop_rise` simultaneously.
    """
    start = "/" if self.tdc_start_rise else "\\"
    stop = "/" if self.tdc_stop_rise else "\\"
    return start + stop

  @tdc_slopes.setter
  def tdc_slopes(self, value):
    if not isinstance(value, str):
      raise Exception("tdc_slopes shall be string")
    if len(value) > 0:
      if value[0] == "/":
        self.tdc_start_rise = True
      elif value[0] == "\\":
        self.tdc_start_rise = False
    if len(value) > 1:
      if value[1] == "/":
        self.tdc_stop_rise = True
      elif value[1] == "\\":
        self.tdc_stop_rise = False

  @property
  def photocathode_on(self):
    """bool: Commanded photocathode state.

    Setting to ``True`` requests the detector to enable the photocathode
    high voltage.  The actual on/off status is reflected in
    :attr:`photocathode_on_status` after the next :meth:`sync`.

    .. warning::
        Do not expose the photocathode to ambient light without
        appropriate neutral density filters or the protective lens cap.
    """
    return self.read_settings.photocathode_on

  @photocathode_on.setter
  def photocathode_on(self, value):
    self.new_settings.photocathode_on = value

  @property
  def test_generator(self):
    """int: Internal test pulse rate in kHz, or ``0`` if disabled.

    Accepted set values: ``0`` (off), ``10``, ``100``, ``1000`` kHz.
    Useful for verifying the timing chain without an external light source.
    """
    match self.read_settings.test_generator:
      case 1:
        return 10
      case 2:
        return 100
      case 3:
        return 1000
      case _:
        return 0

  @test_generator.setter
  def test_generator(self, value):
    v = 0
    if value < 1:
      v = 0
    if value >= 1:
      v = 1
    if value > 10:
      v = 2
    if value > 100:
      v = 3
    self.new_settings.test_generator = v

  @property
  def stop_threshold(self):
    """int: CFD stop-channel threshold voltage in DAC counts."""
    return self.read_settings.stop_threshold

  @stop_threshold.setter
  def stop_threshold(self, value):
    self.new_settings.stop_threshold = value

  @property
  def stop_zero_cross(self):
    """int: CFD stop-channel zero-crossing voltage in DAC counts."""
    return self.read_settings.stop_zero_cross

  @stop_zero_cross.setter
  def stop_zero_cross(self, value):
    self.new_settings.stop_zero_cross = value

  @property
  def time_offset(self):
    """float: Global timing offset applied to all events, in nanoseconds."""
    return self.read_settings.time_offset

  @time_offset.setter
  def time_offset(self, value):
    self.new_settings.time_offset = value

  @property
  def time_binnig(self):
    """int: TDC bin size in picoseconds per channel."""
    return self.read_settings.time_binnig

  @time_binnig.setter
  def time_binnig(self, value):
    self.new_settings.time_binnig = value

  @property
  def time_gate_on(self):
    """bool: Whether the timing gate is active."""
    return self.read_settings.time_gate_enable

  @time_gate_on.setter
  def time_gate_on(self, value):
    self.new_settings.time_gate_enable = value

  def _ns2gate(self, ns):
    return ns * 1000 / self.time_binnig

  def _gate2ns(self, g):
    return self.time_binnig * g / 1000

  @property
  def time_gate_a(self):
    """float: Timing gate start edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_a)

  @time_gate_a.setter
  def time_gate_a(self, value):
    self.new_settings.time_gate_a = int(self._ns2gate(value))

  @property
  def time_gate_b(self):
    """float: Timing gate stop edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_b)

  @time_gate_b.setter
  def time_gate_b(self, value):
    self.new_settings.time_gate_b = int(self._ns2gate(value))

  @property
  def time_gate(self):
    """list[float]: Timing gate ``[start_ns, stop_ns]``, or disables gate on set failure.

    Assign a two-element sequence ``[a, b]`` to set both edges and enable
    the gate.  Assign anything else (e.g. ``None``) to disable the gate.
    """
    return [self.time_gate_a, self.time_gate_b]

  @time_gate.setter
  def time_gate(self, gate):
    try:
      (a, b) = gate
      self.time_gate_a = a
      self.time_gate_b = b
      self.time_gate_on = True
    except:
      self.time_gate_on = False

  # }}}

  # Read-only status {{{
  @property
  def device_connected(self):
    """bool: ``True`` if LINCam Capture has an active connection to the hardware."""
    return self.read_settings.device_connected

  @property
  def device_sn(self):
    """str: Hardware serial number reported by the device."""
    return self.read_settings.device_sn

  @property
  def device_uptime(self):
    """str: Human-readable uptime of the LINCam Capture application."""
    return self.read_settings.device_uptime

  @property
  def mcp_cps(self):
    """float: Total MCP anode count rate in Hz (all photon events, pre-filter)."""
    return self.read_settings.mcp_cps

  @property
  def stop_cps(self):
    """float: CFD stop-channel count rate in Hz."""
    return self.read_settings.stop_cps

  @property
  def acquired_cps(self):
    """float: Net acquired photon count rate in Hz (after all filters/gates)."""
    return self.read_settings.acquired_cps

  @property
  def fpga_temperature(self):
    """float: FPGA die temperature in °C."""
    return self.read_settings.fpga_temperature

  @property
  def detector_temperature(self):
    """float: Detector housing temperature in °C."""
    return self.read_settings.detector_temperature

  @property
  def photocathode_on_status(self):
    """bool: Actual photocathode high-voltage state as reported by the hardware.

    Unlike :attr:`photocathode_on` (the commanded state), this reflects
    whether the HV is actually present.  The overexposure protection
    circuit can disable it even when :attr:`photocathode_on` is ``True``.
    """
    return self.read_settings.photocathode_on_status

time_offset property writable

float: Global timing offset applied to all events, in nanoseconds.

time_binnig property writable

int: TDC bin size in picoseconds per channel.

time_gate property writable

list[float]: Timing gate [start_ns, stop_ns], or disables gate on set failure.

Assign a two-element sequence [a, b] to set both edges and enable the gate. Assign anything else (e.g. None) to disable the gate.

time_gate_on property writable

bool: Whether the timing gate is active.

time_gate_a property writable

float: Timing gate start edge in nanoseconds.

time_gate_b property writable

float: Timing gate stop edge in nanoseconds.

Recording

cam.setup_recording("/data/run01", filenumber=0, seconds=10)
cam.is_recording = True
cam.sync()
# ... wait ...
cam.is_recording = False
cam.sync()

Remote control handle for a LINCam TCSPC camera.

Wraps the gRPC LINCamRemote service exposed by the LINCam Capture application. All settable parameters are Python properties that stage changes locally; call :meth:sync to commit and read back device state.

After construction the object immediately calls :meth:sync so that all read-only properties (serial number, count rates, temperatures, …) are populated.

Attributes:

Name Type Description
hist_2d

Last acquired 2-D spatial histogram as a square numpy array. Updated by :meth:sync_histograms.

hist_dt

Last acquired timing histogram (delta-t) as a 1-D numpy array. Updated by :meth:sync_histograms.

Source code in LINCam/LINCamRemote.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LINCamRemote:
  """Remote control handle for a LINCam TCSPC camera.

  Wraps the gRPC ``LINCamRemote`` service exposed by the LINCam Capture
  application.  All settable parameters are Python properties that stage
  changes locally; call :meth:`sync` to commit and read back device state.

  After construction the object immediately calls :meth:`sync` so that
  all read-only properties (serial number, count rates, temperatures, …)
  are populated.

  Attributes:
      hist_2d: Last acquired 2-D spatial histogram as a square numpy array.
          Updated by :meth:`sync_histograms`.
      hist_dt: Last acquired timing histogram (delta-t) as a 1-D numpy array.
          Updated by :meth:`sync_histograms`.
  """

  def __init__(self, target: str | None = None):
    """Connect to the LINCam Capture gRPC server.

    Args:
        target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
            Defaults to ``"localhost:50051"``.
    """
    self.grpc_target = target
    if target is None:
      self.grpc_target = "localhost:50051"
    self.grpc_channel = grpc.insecure_channel(
      self.grpc_target,
      options = [
        ('grpc.max_send_message_length', 1024 * 1024 * 1024),
        ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
      ]
    )
    self.grpc_client = LINCamRemoteStub(self.grpc_channel)
    self.new_settings = LINCamRemoteSettings()
    self.read_settings = LINCamRemoteSettings()
    self.hist_2d = np.array([])
    self.hist_dt = np.array([])
    self.sync()

  def sync(self):
    """Push pending settings to the device and refresh all status fields.

    Sends the accumulated ``new_settings`` diff to the server.  The server
    returns the full current device state which is stored in
    ``read_settings``.  After the call ``new_settings`` is reset so the
    next :meth:`sync` only sends newly staged changes.
    """
    self.read_settings = self.grpc_client.Sync(self.new_settings)
    self.new_settings = LINCamRemoteSettings()

  def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
    """Fetch the current 2-D spatial image and timing histogram from the device.

    The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
    ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
    delta-t histogram).

    The ``xy_bins`` parameter is rounded *down* to the nearest supported
    resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

    Args:
        xy_bins: Requested edge length of the square spatial image in pixels.
            Supported values: 256, 512 (default), 1024, 2048, 4096.
            Pass ``None`` to skip the 2-D image (timing only).
        clear_after: If ``True`` the device resets its internal histogram
            buffers after the data is returned.
    """
    request = HistorgamsRequest()
    request.clear_after = clear_after
    if xy_bins is None:
      request.image_bins = 0
    else:
      if xy_bins <= 256:
        request.image_bins = 5
      elif xy_bins <= 512:
        request.image_bins = 4
      elif xy_bins <= 1024:
        request.image_bins = 3
      elif xy_bins <= 2048:
        request.image_bins = 2
      else:
        request.image_bins = 1
    res = self.grpc_client.GetHistograms(request)
    h2d = np.array(res.image)
    self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
    self.hist_dt = np.array(res.delta_t)

  def __repr__(self) -> str:
    return self.__str__()

  def __str__(self) -> str:
    if not self.device_connected:
      return f"Not connected URL {self.device_url}"
    test_gen = "OFF" if self.test_generator == 0 else f"{self.test_generator} KHz"
    t_gate = "-" if not self.time_gate_on else f"[{self.time_gate_a} .. {self.time_gate_b}]"
    pc = "off"
    if self.photocathode_on:
      pc = "inactive"
    if self.photocathode_on_status:
      pc = "ACTIVE"

    cps_mcp = LINCamRemote.format_cps(self.mcp_cps)
    cps_acquired = LINCamRemote.format_cps(self.acquired_cps)
    cps_stop = LINCamRemote.format_cps(self.stop_cps)

    return "\n".join(
      [
        f"Device:          {self.device_sn} @ {self.device_url} ({self.device_uptime})",
        f"TDC Slopes:      {self.tdc_slopes}",
        f"Photocathode:    {pc}",
        f"Test generator:  {test_gen}",
        f"CFD thr/zero:    {self.stop_threshold}/{self.stop_zero_cross}",
        f"Timing",  #
        f"   binning:      {self.time_binnig} ps/ch",
        f"    offset:      {self.time_offset:.2f}ns",
        f"      gate:      {t_gate} ns",
        f"CPS",  #
        f"       MCP:      {cps_mcp}",
        f"  Acquired:      {cps_acquired}",
        f"      Stop:      {cps_stop}",
      ]
    )

  @staticmethod
  def format_cps(cps_hz):
    if (cps_hz < 1000):
      return f"{cps_hz} Hz"
    elif (cps_hz < 10_000):
      return f"{cps_hz / 1000:.2f} kHz"
    elif (cps_hz < 100_000):
      return f"{cps_hz / 1000:.1f} kHz"
    elif (cps_hz < 1000_000):
      return f"{int(cps_hz / 1000)} kHz"
    elif (cps_hz < 10_000_000):
      return f"{cps_hz / 1000_000:.2f} MHz"
    else:
      return f"{int(cps_hz / 1000_000)} MHz"

  def setup_recording(
    self,
    filename: str,
    filenumber: int = 0,
    seconds: int = 0,
    continious: bool = False,
  ):
    """Stage file recording parameters (call :meth:`sync` to apply).

    After calling this method set :attr:`is_recording` to ``True`` and
    call :meth:`sync` to start saving photon data to disk.

    Args:
        filename: Base path for the output ``.photons`` file.
            The device appends ``filenumber`` and a timestamp.
        filenumber: Numeric suffix appended to the filename (default 0).
        seconds: Chunk duration in seconds.  ``0`` means record until
            :attr:`is_recording` is set to ``False``.
        continious: If ``True`` the device automatically starts a new chunk
            file when the current one reaches ``seconds``.
    """
    s = self.new_settings
    s.file_path = filename
    s.file_number = filenumber
    s.record_chunk = seconds
    s.record_continious = continious

  @property
  def is_recording(self):
    """bool: Whether the device is currently writing photon data to disk."""
    return self.read_settings.is_recording

  @is_recording.setter
  def is_recording(self, value):
    self.new_settings.is_recording = value

  # Device controls {{{
  @property
  def device_url(self):
    """str: IP address and port of the physical LINCam device (e.g. ``"192.168.1.42:5000"``).

    Setting this tells LINCam Capture which hardware unit to connect to.
    Takes effect on the next :meth:`sync`.
    """
    return self.read_settings.device_url

  @device_url.setter
  def device_url(self, value):
    self.new_settings.device_url = value

  @property
  def tdc_start_rise(self):
    """bool: TDC start channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_start_rise

  @tdc_start_rise.setter
  def tdc_start_rise(self, value):
    self.new_settings.tdc_start_rise = value

  @property
  def tdc_stop_rise(self):
    """bool: TDC stop channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_stop_rise

  @tdc_stop_rise.setter
  def tdc_stop_rise(self, value):
    self.new_settings.tdc_stop_rise = value

  @property
  def tdc_slopes(self):
    """str: Compact slope notation for start and stop channels.

    A two-character string where ``"/"`` means rising and ``"\\"`` means
    falling edge.  For example ``"/\\"`` means start-rising, stop-falling.

    Setting this property updates :attr:`tdc_start_rise` and
    :attr:`tdc_stop_rise` simultaneously.
    """
    start = "/" if self.tdc_start_rise else "\\"
    stop = "/" if self.tdc_stop_rise else "\\"
    return start + stop

  @tdc_slopes.setter
  def tdc_slopes(self, value):
    if not isinstance(value, str):
      raise Exception("tdc_slopes shall be string")
    if len(value) > 0:
      if value[0] == "/":
        self.tdc_start_rise = True
      elif value[0] == "\\":
        self.tdc_start_rise = False
    if len(value) > 1:
      if value[1] == "/":
        self.tdc_stop_rise = True
      elif value[1] == "\\":
        self.tdc_stop_rise = False

  @property
  def photocathode_on(self):
    """bool: Commanded photocathode state.

    Setting to ``True`` requests the detector to enable the photocathode
    high voltage.  The actual on/off status is reflected in
    :attr:`photocathode_on_status` after the next :meth:`sync`.

    .. warning::
        Do not expose the photocathode to ambient light without
        appropriate neutral density filters or the protective lens cap.
    """
    return self.read_settings.photocathode_on

  @photocathode_on.setter
  def photocathode_on(self, value):
    self.new_settings.photocathode_on = value

  @property
  def test_generator(self):
    """int: Internal test pulse rate in kHz, or ``0`` if disabled.

    Accepted set values: ``0`` (off), ``10``, ``100``, ``1000`` kHz.
    Useful for verifying the timing chain without an external light source.
    """
    match self.read_settings.test_generator:
      case 1:
        return 10
      case 2:
        return 100
      case 3:
        return 1000
      case _:
        return 0

  @test_generator.setter
  def test_generator(self, value):
    v = 0
    if value < 1:
      v = 0
    if value >= 1:
      v = 1
    if value > 10:
      v = 2
    if value > 100:
      v = 3
    self.new_settings.test_generator = v

  @property
  def stop_threshold(self):
    """int: CFD stop-channel threshold voltage in DAC counts."""
    return self.read_settings.stop_threshold

  @stop_threshold.setter
  def stop_threshold(self, value):
    self.new_settings.stop_threshold = value

  @property
  def stop_zero_cross(self):
    """int: CFD stop-channel zero-crossing voltage in DAC counts."""
    return self.read_settings.stop_zero_cross

  @stop_zero_cross.setter
  def stop_zero_cross(self, value):
    self.new_settings.stop_zero_cross = value

  @property
  def time_offset(self):
    """float: Global timing offset applied to all events, in nanoseconds."""
    return self.read_settings.time_offset

  @time_offset.setter
  def time_offset(self, value):
    self.new_settings.time_offset = value

  @property
  def time_binnig(self):
    """int: TDC bin size in picoseconds per channel."""
    return self.read_settings.time_binnig

  @time_binnig.setter
  def time_binnig(self, value):
    self.new_settings.time_binnig = value

  @property
  def time_gate_on(self):
    """bool: Whether the timing gate is active."""
    return self.read_settings.time_gate_enable

  @time_gate_on.setter
  def time_gate_on(self, value):
    self.new_settings.time_gate_enable = value

  def _ns2gate(self, ns):
    return ns * 1000 / self.time_binnig

  def _gate2ns(self, g):
    return self.time_binnig * g / 1000

  @property
  def time_gate_a(self):
    """float: Timing gate start edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_a)

  @time_gate_a.setter
  def time_gate_a(self, value):
    self.new_settings.time_gate_a = int(self._ns2gate(value))

  @property
  def time_gate_b(self):
    """float: Timing gate stop edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_b)

  @time_gate_b.setter
  def time_gate_b(self, value):
    self.new_settings.time_gate_b = int(self._ns2gate(value))

  @property
  def time_gate(self):
    """list[float]: Timing gate ``[start_ns, stop_ns]``, or disables gate on set failure.

    Assign a two-element sequence ``[a, b]`` to set both edges and enable
    the gate.  Assign anything else (e.g. ``None``) to disable the gate.
    """
    return [self.time_gate_a, self.time_gate_b]

  @time_gate.setter
  def time_gate(self, gate):
    try:
      (a, b) = gate
      self.time_gate_a = a
      self.time_gate_b = b
      self.time_gate_on = True
    except:
      self.time_gate_on = False

  # }}}

  # Read-only status {{{
  @property
  def device_connected(self):
    """bool: ``True`` if LINCam Capture has an active connection to the hardware."""
    return self.read_settings.device_connected

  @property
  def device_sn(self):
    """str: Hardware serial number reported by the device."""
    return self.read_settings.device_sn

  @property
  def device_uptime(self):
    """str: Human-readable uptime of the LINCam Capture application."""
    return self.read_settings.device_uptime

  @property
  def mcp_cps(self):
    """float: Total MCP anode count rate in Hz (all photon events, pre-filter)."""
    return self.read_settings.mcp_cps

  @property
  def stop_cps(self):
    """float: CFD stop-channel count rate in Hz."""
    return self.read_settings.stop_cps

  @property
  def acquired_cps(self):
    """float: Net acquired photon count rate in Hz (after all filters/gates)."""
    return self.read_settings.acquired_cps

  @property
  def fpga_temperature(self):
    """float: FPGA die temperature in °C."""
    return self.read_settings.fpga_temperature

  @property
  def detector_temperature(self):
    """float: Detector housing temperature in °C."""
    return self.read_settings.detector_temperature

  @property
  def photocathode_on_status(self):
    """bool: Actual photocathode high-voltage state as reported by the hardware.

    Unlike :attr:`photocathode_on` (the commanded state), this reflects
    whether the HV is actually present.  The overexposure protection
    circuit can disable it even when :attr:`photocathode_on` is ``True``.
    """
    return self.read_settings.photocathode_on_status

is_recording property writable

bool: Whether the device is currently writing photon data to disk.

setup_recording(filename, filenumber=0, seconds=0, continious=False)

Stage file recording parameters (call :meth:sync to apply).

After calling this method set :attr:is_recording to True and call :meth:sync to start saving photon data to disk.

Parameters:

Name Type Description Default
filename str

Base path for the output .photons file. The device appends filenumber and a timestamp.

required
filenumber int

Numeric suffix appended to the filename (default 0).

0
seconds int

Chunk duration in seconds. 0 means record until :attr:is_recording is set to False.

0
continious bool

If True the device automatically starts a new chunk file when the current one reaches seconds.

False
Source code in LINCam/LINCamRemote.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def setup_recording(
  self,
  filename: str,
  filenumber: int = 0,
  seconds: int = 0,
  continious: bool = False,
):
  """Stage file recording parameters (call :meth:`sync` to apply).

  After calling this method set :attr:`is_recording` to ``True`` and
  call :meth:`sync` to start saving photon data to disk.

  Args:
      filename: Base path for the output ``.photons`` file.
          The device appends ``filenumber`` and a timestamp.
      filenumber: Numeric suffix appended to the filename (default 0).
      seconds: Chunk duration in seconds.  ``0`` means record until
          :attr:`is_recording` is set to ``False``.
      continious: If ``True`` the device automatically starts a new chunk
          file when the current one reaches ``seconds``.
  """
  s = self.new_settings
  s.file_path = filename
  s.file_number = filenumber
  s.record_chunk = seconds
  s.record_continious = continious

Status (Read-Only)

All status properties are updated on every sync() call.

Property Type Description
device_connected bool Hardware connection active
device_sn str Device serial number
device_uptime str Application uptime
photocathode_on_status bool Actual HV state (may differ from commanded)
mcp_cps float Total MCP count rate (Hz)
stop_cps float CFD stop-channel rate (Hz)
acquired_cps float Net acquired rate after gates (Hz)
fpga_temperature float FPGA temperature (°C)
detector_temperature float Detector temperature (°C)

Remote control handle for a LINCam TCSPC camera.

Wraps the gRPC LINCamRemote service exposed by the LINCam Capture application. All settable parameters are Python properties that stage changes locally; call :meth:sync to commit and read back device state.

After construction the object immediately calls :meth:sync so that all read-only properties (serial number, count rates, temperatures, …) are populated.

Attributes:

Name Type Description
hist_2d

Last acquired 2-D spatial histogram as a square numpy array. Updated by :meth:sync_histograms.

hist_dt

Last acquired timing histogram (delta-t) as a 1-D numpy array. Updated by :meth:sync_histograms.

Source code in LINCam/LINCamRemote.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LINCamRemote:
  """Remote control handle for a LINCam TCSPC camera.

  Wraps the gRPC ``LINCamRemote`` service exposed by the LINCam Capture
  application.  All settable parameters are Python properties that stage
  changes locally; call :meth:`sync` to commit and read back device state.

  After construction the object immediately calls :meth:`sync` so that
  all read-only properties (serial number, count rates, temperatures, …)
  are populated.

  Attributes:
      hist_2d: Last acquired 2-D spatial histogram as a square numpy array.
          Updated by :meth:`sync_histograms`.
      hist_dt: Last acquired timing histogram (delta-t) as a 1-D numpy array.
          Updated by :meth:`sync_histograms`.
  """

  def __init__(self, target: str | None = None):
    """Connect to the LINCam Capture gRPC server.

    Args:
        target: Host and port of the gRPC server, e.g. ``"192.168.1.10:50051"``.
            Defaults to ``"localhost:50051"``.
    """
    self.grpc_target = target
    if target is None:
      self.grpc_target = "localhost:50051"
    self.grpc_channel = grpc.insecure_channel(
      self.grpc_target,
      options = [
        ('grpc.max_send_message_length', 1024 * 1024 * 1024),
        ('grpc.max_receive_message_length', 1024 * 1024 * 1024),
      ]
    )
    self.grpc_client = LINCamRemoteStub(self.grpc_channel)
    self.new_settings = LINCamRemoteSettings()
    self.read_settings = LINCamRemoteSettings()
    self.hist_2d = np.array([])
    self.hist_dt = np.array([])
    self.sync()

  def sync(self):
    """Push pending settings to the device and refresh all status fields.

    Sends the accumulated ``new_settings`` diff to the server.  The server
    returns the full current device state which is stored in
    ``read_settings``.  After the call ``new_settings`` is reset so the
    next :meth:`sync` only sends newly staged changes.
    """
    self.read_settings = self.grpc_client.Sync(self.new_settings)
    self.new_settings = LINCamRemoteSettings()

  def sync_histograms(self, xy_bins: int | None = 512, clear_after: bool = False):
    """Fetch the current 2-D spatial image and timing histogram from the device.

    The acquired data is stored in :attr:`hist_2d` (2-D numpy array, shape
    ``[xy_bins, xy_bins]``) and :attr:`hist_dt` (1-D numpy array with the
    delta-t histogram).

    The ``xy_bins`` parameter is rounded *down* to the nearest supported
    resolution: 256, 512, 1024, 2048, or 4096 pixels per side.

    Args:
        xy_bins: Requested edge length of the square spatial image in pixels.
            Supported values: 256, 512 (default), 1024, 2048, 4096.
            Pass ``None`` to skip the 2-D image (timing only).
        clear_after: If ``True`` the device resets its internal histogram
            buffers after the data is returned.
    """
    request = HistorgamsRequest()
    request.clear_after = clear_after
    if xy_bins is None:
      request.image_bins = 0
    else:
      if xy_bins <= 256:
        request.image_bins = 5
      elif xy_bins <= 512:
        request.image_bins = 4
      elif xy_bins <= 1024:
        request.image_bins = 3
      elif xy_bins <= 2048:
        request.image_bins = 2
      else:
        request.image_bins = 1
    res = self.grpc_client.GetHistograms(request)
    h2d = np.array(res.image)
    self.hist_2d = np.reshape(h2d, [res.image_size, res.image_size])
    self.hist_dt = np.array(res.delta_t)

  def __repr__(self) -> str:
    return self.__str__()

  def __str__(self) -> str:
    if not self.device_connected:
      return f"Not connected URL {self.device_url}"
    test_gen = "OFF" if self.test_generator == 0 else f"{self.test_generator} KHz"
    t_gate = "-" if not self.time_gate_on else f"[{self.time_gate_a} .. {self.time_gate_b}]"
    pc = "off"
    if self.photocathode_on:
      pc = "inactive"
    if self.photocathode_on_status:
      pc = "ACTIVE"

    cps_mcp = LINCamRemote.format_cps(self.mcp_cps)
    cps_acquired = LINCamRemote.format_cps(self.acquired_cps)
    cps_stop = LINCamRemote.format_cps(self.stop_cps)

    return "\n".join(
      [
        f"Device:          {self.device_sn} @ {self.device_url} ({self.device_uptime})",
        f"TDC Slopes:      {self.tdc_slopes}",
        f"Photocathode:    {pc}",
        f"Test generator:  {test_gen}",
        f"CFD thr/zero:    {self.stop_threshold}/{self.stop_zero_cross}",
        f"Timing",  #
        f"   binning:      {self.time_binnig} ps/ch",
        f"    offset:      {self.time_offset:.2f}ns",
        f"      gate:      {t_gate} ns",
        f"CPS",  #
        f"       MCP:      {cps_mcp}",
        f"  Acquired:      {cps_acquired}",
        f"      Stop:      {cps_stop}",
      ]
    )

  @staticmethod
  def format_cps(cps_hz):
    if (cps_hz < 1000):
      return f"{cps_hz} Hz"
    elif (cps_hz < 10_000):
      return f"{cps_hz / 1000:.2f} kHz"
    elif (cps_hz < 100_000):
      return f"{cps_hz / 1000:.1f} kHz"
    elif (cps_hz < 1000_000):
      return f"{int(cps_hz / 1000)} kHz"
    elif (cps_hz < 10_000_000):
      return f"{cps_hz / 1000_000:.2f} MHz"
    else:
      return f"{int(cps_hz / 1000_000)} MHz"

  def setup_recording(
    self,
    filename: str,
    filenumber: int = 0,
    seconds: int = 0,
    continious: bool = False,
  ):
    """Stage file recording parameters (call :meth:`sync` to apply).

    After calling this method set :attr:`is_recording` to ``True`` and
    call :meth:`sync` to start saving photon data to disk.

    Args:
        filename: Base path for the output ``.photons`` file.
            The device appends ``filenumber`` and a timestamp.
        filenumber: Numeric suffix appended to the filename (default 0).
        seconds: Chunk duration in seconds.  ``0`` means record until
            :attr:`is_recording` is set to ``False``.
        continious: If ``True`` the device automatically starts a new chunk
            file when the current one reaches ``seconds``.
    """
    s = self.new_settings
    s.file_path = filename
    s.file_number = filenumber
    s.record_chunk = seconds
    s.record_continious = continious

  @property
  def is_recording(self):
    """bool: Whether the device is currently writing photon data to disk."""
    return self.read_settings.is_recording

  @is_recording.setter
  def is_recording(self, value):
    self.new_settings.is_recording = value

  # Device controls {{{
  @property
  def device_url(self):
    """str: IP address and port of the physical LINCam device (e.g. ``"192.168.1.42:5000"``).

    Setting this tells LINCam Capture which hardware unit to connect to.
    Takes effect on the next :meth:`sync`.
    """
    return self.read_settings.device_url

  @device_url.setter
  def device_url(self, value):
    self.new_settings.device_url = value

  @property
  def tdc_start_rise(self):
    """bool: TDC start channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_start_rise

  @tdc_start_rise.setter
  def tdc_start_rise(self, value):
    self.new_settings.tdc_start_rise = value

  @property
  def tdc_stop_rise(self):
    """bool: TDC stop channel edge polarity — ``True`` = rising, ``False`` = falling."""
    return self.read_settings.tdc_stop_rise

  @tdc_stop_rise.setter
  def tdc_stop_rise(self, value):
    self.new_settings.tdc_stop_rise = value

  @property
  def tdc_slopes(self):
    """str: Compact slope notation for start and stop channels.

    A two-character string where ``"/"`` means rising and ``"\\"`` means
    falling edge.  For example ``"/\\"`` means start-rising, stop-falling.

    Setting this property updates :attr:`tdc_start_rise` and
    :attr:`tdc_stop_rise` simultaneously.
    """
    start = "/" if self.tdc_start_rise else "\\"
    stop = "/" if self.tdc_stop_rise else "\\"
    return start + stop

  @tdc_slopes.setter
  def tdc_slopes(self, value):
    if not isinstance(value, str):
      raise Exception("tdc_slopes shall be string")
    if len(value) > 0:
      if value[0] == "/":
        self.tdc_start_rise = True
      elif value[0] == "\\":
        self.tdc_start_rise = False
    if len(value) > 1:
      if value[1] == "/":
        self.tdc_stop_rise = True
      elif value[1] == "\\":
        self.tdc_stop_rise = False

  @property
  def photocathode_on(self):
    """bool: Commanded photocathode state.

    Setting to ``True`` requests the detector to enable the photocathode
    high voltage.  The actual on/off status is reflected in
    :attr:`photocathode_on_status` after the next :meth:`sync`.

    .. warning::
        Do not expose the photocathode to ambient light without
        appropriate neutral density filters or the protective lens cap.
    """
    return self.read_settings.photocathode_on

  @photocathode_on.setter
  def photocathode_on(self, value):
    self.new_settings.photocathode_on = value

  @property
  def test_generator(self):
    """int: Internal test pulse rate in kHz, or ``0`` if disabled.

    Accepted set values: ``0`` (off), ``10``, ``100``, ``1000`` kHz.
    Useful for verifying the timing chain without an external light source.
    """
    match self.read_settings.test_generator:
      case 1:
        return 10
      case 2:
        return 100
      case 3:
        return 1000
      case _:
        return 0

  @test_generator.setter
  def test_generator(self, value):
    v = 0
    if value < 1:
      v = 0
    if value >= 1:
      v = 1
    if value > 10:
      v = 2
    if value > 100:
      v = 3
    self.new_settings.test_generator = v

  @property
  def stop_threshold(self):
    """int: CFD stop-channel threshold voltage in DAC counts."""
    return self.read_settings.stop_threshold

  @stop_threshold.setter
  def stop_threshold(self, value):
    self.new_settings.stop_threshold = value

  @property
  def stop_zero_cross(self):
    """int: CFD stop-channel zero-crossing voltage in DAC counts."""
    return self.read_settings.stop_zero_cross

  @stop_zero_cross.setter
  def stop_zero_cross(self, value):
    self.new_settings.stop_zero_cross = value

  @property
  def time_offset(self):
    """float: Global timing offset applied to all events, in nanoseconds."""
    return self.read_settings.time_offset

  @time_offset.setter
  def time_offset(self, value):
    self.new_settings.time_offset = value

  @property
  def time_binnig(self):
    """int: TDC bin size in picoseconds per channel."""
    return self.read_settings.time_binnig

  @time_binnig.setter
  def time_binnig(self, value):
    self.new_settings.time_binnig = value

  @property
  def time_gate_on(self):
    """bool: Whether the timing gate is active."""
    return self.read_settings.time_gate_enable

  @time_gate_on.setter
  def time_gate_on(self, value):
    self.new_settings.time_gate_enable = value

  def _ns2gate(self, ns):
    return ns * 1000 / self.time_binnig

  def _gate2ns(self, g):
    return self.time_binnig * g / 1000

  @property
  def time_gate_a(self):
    """float: Timing gate start edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_a)

  @time_gate_a.setter
  def time_gate_a(self, value):
    self.new_settings.time_gate_a = int(self._ns2gate(value))

  @property
  def time_gate_b(self):
    """float: Timing gate stop edge in nanoseconds."""
    return self._gate2ns(self.read_settings.time_gate_b)

  @time_gate_b.setter
  def time_gate_b(self, value):
    self.new_settings.time_gate_b = int(self._ns2gate(value))

  @property
  def time_gate(self):
    """list[float]: Timing gate ``[start_ns, stop_ns]``, or disables gate on set failure.

    Assign a two-element sequence ``[a, b]`` to set both edges and enable
    the gate.  Assign anything else (e.g. ``None``) to disable the gate.
    """
    return [self.time_gate_a, self.time_gate_b]

  @time_gate.setter
  def time_gate(self, gate):
    try:
      (a, b) = gate
      self.time_gate_a = a
      self.time_gate_b = b
      self.time_gate_on = True
    except:
      self.time_gate_on = False

  # }}}

  # Read-only status {{{
  @property
  def device_connected(self):
    """bool: ``True`` if LINCam Capture has an active connection to the hardware."""
    return self.read_settings.device_connected

  @property
  def device_sn(self):
    """str: Hardware serial number reported by the device."""
    return self.read_settings.device_sn

  @property
  def device_uptime(self):
    """str: Human-readable uptime of the LINCam Capture application."""
    return self.read_settings.device_uptime

  @property
  def mcp_cps(self):
    """float: Total MCP anode count rate in Hz (all photon events, pre-filter)."""
    return self.read_settings.mcp_cps

  @property
  def stop_cps(self):
    """float: CFD stop-channel count rate in Hz."""
    return self.read_settings.stop_cps

  @property
  def acquired_cps(self):
    """float: Net acquired photon count rate in Hz (after all filters/gates)."""
    return self.read_settings.acquired_cps

  @property
  def fpga_temperature(self):
    """float: FPGA die temperature in °C."""
    return self.read_settings.fpga_temperature

  @property
  def detector_temperature(self):
    """float: Detector housing temperature in °C."""
    return self.read_settings.detector_temperature

  @property
  def photocathode_on_status(self):
    """bool: Actual photocathode high-voltage state as reported by the hardware.

    Unlike :attr:`photocathode_on` (the commanded state), this reflects
    whether the HV is actually present.  The overexposure protection
    circuit can disable it even when :attr:`photocathode_on` is ``True``.
    """
    return self.read_settings.photocathode_on_status

device_connected property

bool: True if LINCam Capture has an active connection to the hardware.

device_sn property

str: Hardware serial number reported by the device.

device_uptime property

str: Human-readable uptime of the LINCam Capture application.

photocathode_on_status property

bool: Actual photocathode high-voltage state as reported by the hardware.

Unlike :attr:photocathode_on (the commanded state), this reflects whether the HV is actually present. The overexposure protection circuit can disable it even when :attr:photocathode_on is True.

mcp_cps property

float: Total MCP anode count rate in Hz (all photon events, pre-filter).

stop_cps property

float: CFD stop-channel count rate in Hz.

acquired_cps property

float: Net acquired photon count rate in Hz (after all filters/gates).

fpga_temperature property

float: FPGA die temperature in °C.

detector_temperature property

float: Detector housing temperature in °C.