Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
P
pyLIAF
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nicolas Nunez Barreto
pyLIAF
Commits
9626c012
Commit
9626c012
authored
Jul 14, 2021
by
Nicolas Nunez Barreto
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
borre un archivo y lo condense en read analog
parent
4890e875
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
138 additions
and
147 deletions
+138
-147
Read_analog.py
pyLIAF/RedPitaya/Read_analog.py
+138
-3
redpitaya_scpi.py
pyLIAF/RedPitaya/redpitaya_scpi.py
+0
-144
No files found.
pyLIAF/RedPitaya/Read_analog.py
View file @
9626c012
...
...
@@ -5,15 +5,150 @@ Created on Tue Jul 13 15:25:50 2021
@author: liaf-ankylosaurus-admin
"""
import
socket
import
sys
import
redpitaya_scpi
as
scpi
#
import redpitaya_scpi as scpi
from
time
import
sleep
import
numpy
as
np
__author__
=
"Luka Golinar, Iztok Jeras"
__copyright__
=
"Copyright 2015, Red Pitaya"
class
scpi
(
object
):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter
=
'
\r\n
'
def
__init__
(
self
,
host
,
timeout
=
None
,
port
=
5000
):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self
.
host
=
host
self
.
port
=
port
self
.
timeout
=
timeout
try
:
self
.
_socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
if
timeout
is
not
None
:
self
.
_socket
.
settimeout
(
timeout
)
self
.
_socket
.
connect
((
host
,
port
))
except
socket
.
error
as
e
:
print
(
'SCPI >> connect({:s}:{:d}) failed: {:s}'
.
format
(
host
,
port
,
e
))
def
__del__
(
self
):
if
self
.
_socket
is
not
None
:
self
.
_socket
.
close
()
self
.
_socket
=
None
def
close
(
self
):
"""Close IP connection."""
self
.
__del__
()
def
rx_txt
(
self
,
chunksize
=
4096
):
"""Receive text string and return it after removing the delimiter."""
msg
=
''
while
1
:
chunk
=
self
.
_socket
.
recv
(
chunksize
+
len
(
self
.
delimiter
))
.
decode
(
'utf-8'
)
# Receive chunk size of 2^n preferably
msg
+=
chunk
if
(
len
(
chunk
)
and
chunk
[
-
2
:]
==
self
.
delimiter
):
break
return
msg
[:
-
2
]
def
rx_arb
(
self
):
numOfBytes
=
0
""" Recieve binary data from scpi server"""
str
=
''
while
(
len
(
str
)
!=
1
):
str
=
(
self
.
_socket
.
recv
(
1
))
if
not
(
str
==
'#'
):
return
False
str
=
''
while
(
len
(
str
)
!=
1
):
str
=
(
self
.
_socket
.
recv
(
1
))
numOfNumBytes
=
int
(
str
)
if
not
(
numOfNumBytes
>
0
):
return
False
str
=
''
while
(
len
(
str
)
!=
numOfNumBytes
):
str
+=
(
self
.
_socket
.
recv
(
1
))
numOfBytes
=
int
(
str
)
str
=
''
while
(
len
(
str
)
!=
numOfBytes
):
str
+=
(
self
.
_socket
.
recv
(
1
))
return
str
def
tx_txt
(
self
,
msg
):
"""Send text string ending and append delimiter."""
return
self
.
_socket
.
send
((
msg
+
self
.
delimiter
)
.
encode
(
'utf-8'
))
def
txrx_txt
(
self
,
msg
):
"""Send/receive text string."""
self
.
tx_txt
(
msg
)
return
self
.
rx_txt
()
# IEEE Mandated Commands
def
cls
(
self
):
"""Clear Status Command"""
return
self
.
tx_txt
(
'*CLS'
)
def
ese
(
self
,
value
:
int
):
"""Standard Event Status Enable Command"""
return
self
.
tx_txt
(
'*ESE {}'
.
format
(
value
))
def
ese_q
(
self
):
"""Standard Event Status Enable Query"""
return
self
.
txrx_txt
(
'*ESE?'
)
def
esr_q
(
self
):
"""Standard Event Status Register Query"""
return
self
.
txrx_txt
(
'*ESR?'
)
def
idn_q
(
self
):
"""Identification Query"""
return
self
.
txrx_txt
(
'*IDN?'
)
def
opc
(
self
):
"""Operation Complete Command"""
return
self
.
tx_txt
(
'*OPC'
)
def
opc_q
(
self
):
"""Operation Complete Query"""
return
self
.
txrx_txt
(
'*OPC?'
)
def
rst
(
self
):
"""Reset Command"""
return
self
.
tx_txt
(
'*RST'
)
def
sre
(
self
):
"""Service Request Enable Command"""
return
self
.
tx_txt
(
'*SRE'
)
def
sre_q
(
self
):
"""Service Request Enable Query"""
return
self
.
txrx_txt
(
'*SRE?'
)
def
stb_q
(
self
):
"""Read Status Byte Query"""
return
self
.
txrx_txt
(
'*STB?'
)
# :SYSTem
def
err_c
(
self
):
"""Error count."""
return
rp
.
txrx_txt
(
'SYST:ERR:COUN?'
)
def
err_c
(
self
):
"""Error next."""
return
rp
.
txrx_txt
(
'SYST:ERR:NEXT?'
)
def
ReadVoltage
():
rp_s
=
scpi
.
scpi
(
'192.168.1.244'
)
rp_s
=
scpi
(
'192.168.1.244'
)
rp_s
.
tx_txt
(
'ACQ:START'
)
rp_s
.
tx_txt
(
'ACQ:TRIG NOW'
)
...
...
pyLIAF/RedPitaya/redpitaya_scpi.py
deleted
100644 → 0
View file @
4890e875
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 13 15:24:53 2021
@author: liaf-ankylosaurus-admin
"""
"""SCPI access to Red Pitaya."""
import
socket
__author__
=
"Luka Golinar, Iztok Jeras"
__copyright__
=
"Copyright 2015, Red Pitaya"
class
scpi
(
object
):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter
=
'
\r\n
'
def
__init__
(
self
,
host
,
timeout
=
None
,
port
=
5000
):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self
.
host
=
host
self
.
port
=
port
self
.
timeout
=
timeout
try
:
self
.
_socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
if
timeout
is
not
None
:
self
.
_socket
.
settimeout
(
timeout
)
self
.
_socket
.
connect
((
host
,
port
))
except
socket
.
error
as
e
:
print
(
'SCPI >> connect({:s}:{:d}) failed: {:s}'
.
format
(
host
,
port
,
e
))
def
__del__
(
self
):
if
self
.
_socket
is
not
None
:
self
.
_socket
.
close
()
self
.
_socket
=
None
def
close
(
self
):
"""Close IP connection."""
self
.
__del__
()
def
rx_txt
(
self
,
chunksize
=
4096
):
"""Receive text string and return it after removing the delimiter."""
msg
=
''
while
1
:
chunk
=
self
.
_socket
.
recv
(
chunksize
+
len
(
self
.
delimiter
))
.
decode
(
'utf-8'
)
# Receive chunk size of 2^n preferably
msg
+=
chunk
if
(
len
(
chunk
)
and
chunk
[
-
2
:]
==
self
.
delimiter
):
break
return
msg
[:
-
2
]
def
rx_arb
(
self
):
numOfBytes
=
0
""" Recieve binary data from scpi server"""
str
=
''
while
(
len
(
str
)
!=
1
):
str
=
(
self
.
_socket
.
recv
(
1
))
if
not
(
str
==
'#'
):
return
False
str
=
''
while
(
len
(
str
)
!=
1
):
str
=
(
self
.
_socket
.
recv
(
1
))
numOfNumBytes
=
int
(
str
)
if
not
(
numOfNumBytes
>
0
):
return
False
str
=
''
while
(
len
(
str
)
!=
numOfNumBytes
):
str
+=
(
self
.
_socket
.
recv
(
1
))
numOfBytes
=
int
(
str
)
str
=
''
while
(
len
(
str
)
!=
numOfBytes
):
str
+=
(
self
.
_socket
.
recv
(
1
))
return
str
def
tx_txt
(
self
,
msg
):
"""Send text string ending and append delimiter."""
return
self
.
_socket
.
send
((
msg
+
self
.
delimiter
)
.
encode
(
'utf-8'
))
def
txrx_txt
(
self
,
msg
):
"""Send/receive text string."""
self
.
tx_txt
(
msg
)
return
self
.
rx_txt
()
# IEEE Mandated Commands
def
cls
(
self
):
"""Clear Status Command"""
return
self
.
tx_txt
(
'*CLS'
)
def
ese
(
self
,
value
:
int
):
"""Standard Event Status Enable Command"""
return
self
.
tx_txt
(
'*ESE {}'
.
format
(
value
))
def
ese_q
(
self
):
"""Standard Event Status Enable Query"""
return
self
.
txrx_txt
(
'*ESE?'
)
def
esr_q
(
self
):
"""Standard Event Status Register Query"""
return
self
.
txrx_txt
(
'*ESR?'
)
def
idn_q
(
self
):
"""Identification Query"""
return
self
.
txrx_txt
(
'*IDN?'
)
def
opc
(
self
):
"""Operation Complete Command"""
return
self
.
tx_txt
(
'*OPC'
)
def
opc_q
(
self
):
"""Operation Complete Query"""
return
self
.
txrx_txt
(
'*OPC?'
)
def
rst
(
self
):
"""Reset Command"""
return
self
.
tx_txt
(
'*RST'
)
def
sre
(
self
):
"""Service Request Enable Command"""
return
self
.
tx_txt
(
'*SRE'
)
def
sre_q
(
self
):
"""Service Request Enable Query"""
return
self
.
txrx_txt
(
'*SRE?'
)
def
stb_q
(
self
):
"""Read Status Byte Query"""
return
self
.
txrx_txt
(
'*STB?'
)
# :SYSTem
def
err_c
(
self
):
"""Error count."""
return
rp
.
txrx_txt
(
'SYST:ERR:COUN?'
)
def
err_c
(
self
):
"""Error next."""
return
rp
.
txrx_txt
(
'SYST:ERR:NEXT?'
)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment