Commit 4890e875 authored by Nicolas Nunez Barreto's avatar Nicolas Nunez Barreto

agrego modulo para leer una red pitaya en el canal analogico rapido 1

parent ead0f33d
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 13 15:25:50 2021
@author: liaf-ankylosaurus-admin
"""
import sys
import redpitaya_scpi as scpi
from time import sleep
def ReadVoltage():
rp_s = scpi.scpi('192.168.1.244')
rp_s.tx_txt('ACQ:START')
rp_s.tx_txt('ACQ:TRIG NOW')
while 1:
rp_s.tx_txt('ACQ:TRIG:STAT?')
if rp_s.rx_txt() == 'TD':
break
rp_s.tx_txt('ACQ:SOUR1:DATA?')
buff_string = rp_s.rx_txt()
buff_string = buff_string.strip('{}\n\r').replace(" ", "").split(',')
buff = list(map(float, buff_string))
return np.mean(buff)
if __name__ == "__main__":
for i in range(50):
print(ReadVoltage())
sleep(2)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 13 15:24:53 2021
@author: liaf-ankylosaurus-admin
"""
"""SCPI access to Red Pitaya."""
import socket
__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"
class scpi (object):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter = '\r\n'
def __init__(self, host, timeout=None, port=5000):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self.host = host
self.port = port
self.timeout = timeout
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout is not None:
self._socket.settimeout(timeout)
self._socket.connect((host, port))
except socket.error as e:
print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))
def __del__(self):
if self._socket is not None:
self._socket.close()
self._socket = None
def close(self):
"""Close IP connection."""
self.__del__()
def rx_txt(self, chunksize = 4096):
"""Receive text string and return it after removing the delimiter."""
msg = ''
while 1:
chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
msg += chunk
if (len(chunk) and chunk[-2:] == self.delimiter):
break
return msg[:-2]
def rx_arb(self):
numOfBytes = 0
""" Recieve binary data from scpi server"""
str=''
while (len(str) != 1):
str = (self._socket.recv(1))
if not (str == '#'):
return False
str=''
while (len(str) != 1):
str = (self._socket.recv(1))
numOfNumBytes = int(str)
if not (numOfNumBytes > 0):
return False
str=''
while (len(str) != numOfNumBytes):
str += (self._socket.recv(1))
numOfBytes = int(str)
str=''
while (len(str) != numOfBytes):
str += (self._socket.recv(1))
return str
def tx_txt(self, msg):
"""Send text string ending and append delimiter."""
return self._socket.send((msg + self.delimiter).encode('utf-8'))
def txrx_txt(self, msg):
"""Send/receive text string."""
self.tx_txt(msg)
return self.rx_txt()
# IEEE Mandated Commands
def cls(self):
"""Clear Status Command"""
return self.tx_txt('*CLS')
def ese(self, value: int):
"""Standard Event Status Enable Command"""
return self.tx_txt('*ESE {}'.format(value))
def ese_q(self):
"""Standard Event Status Enable Query"""
return self.txrx_txt('*ESE?')
def esr_q(self):
"""Standard Event Status Register Query"""
return self.txrx_txt('*ESR?')
def idn_q(self):
"""Identification Query"""
return self.txrx_txt('*IDN?')
def opc(self):
"""Operation Complete Command"""
return self.tx_txt('*OPC')
def opc_q(self):
"""Operation Complete Query"""
return self.txrx_txt('*OPC?')
def rst(self):
"""Reset Command"""
return self.tx_txt('*RST')
def sre(self):
"""Service Request Enable Command"""
return self.tx_txt('*SRE')
def sre_q(self):
"""Service Request Enable Query"""
return self.txrx_txt('*SRE?')
def stb_q(self):
"""Read Status Byte Query"""
return self.txrx_txt('*STB?')
# :SYSTem
def err_c(self):
"""Error count."""
return rp.txrx_txt('SYST:ERR:COUN?')
def err_c(self):
"""Error next."""
return rp.txrx_txt('SYST:ERR:NEXT?')
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment