-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathXnatRepushScan
More file actions
294 lines (244 loc) · 9.87 KB
/
XnatRepushScan
File metadata and controls
294 lines (244 loc) · 9.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Repush project scans from PACS that are missing on XNAT
Created April 26th, 2023
@author: William Duett, VUIIS CCI, Vanderbilt University
'''
# Libraries
import ast
import csv
import logging
import os
import pandas as pd
import pyxnat
import re
import requests
import shutil
import sys
import time
from dax import XnatUtils
DEFAULT_ARGUMENTS = {'host': None, 'username': None, 'project': None, 'subject': None, 'session': None, 'scan': None}
DESCRIPTION = """What is the script doing :
*Repush project scans from PACS that are missing on XNAT
Example:
*Push all missing examcards from given project
XnatRepushScan --proj PID --scan unknown
*Push all missing T1s from given project
XnatRepushScan --proj PID --scan T1
"""
def setup_info_logger(name):
"""
Using logger for the executables output.
Setting the information for the logger.
:param name: Name of the logger
:return: logging object
"""
handler = logging.StreamHandler()
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return logger
def parse_args():
"""
Method to parse arguments base on ArgumentParser
:return: parser object
"""
from argparse import ArgumentParser, RawDescriptionHelpFormatter
argp = ArgumentParser(prog='XnatRepushScan', description=DESCRIPTION,
formatter_class=RawDescriptionHelpFormatter)
argp.add_argument('--host', dest='host', default=None,
help='Host for XNAT. Default: using $XNAT_HOST.')
argp.add_argument('-u', '--username', dest='username', default=None,
help='Username for XNAT. Default: using $XNAT_USER.')
argp.add_argument('--proj', dest='project', default=None,
help='Project to repush scans on')
argp.add_argument('--subj', dest='subject', default=None,
help='Subject to repush scans on')
argp.add_argument('--sess', dest='session', default=None,
help='Session to repush scans on')
argp.add_argument('--scan', dest='scan', default=None,
help='Scan type to repush')
return argp
def check_options():
"""
Method to check the options specified by the user
:return: True if OPTIONS are fine, False otherwise
"""
if not OPTIONS.project:
LOGGER.warn('ERROR: Project not specified. Use option --proj')
return False
if not OPTIONS.project:
LOGGER.warn('ERROR: Scan type not specified. Use option --scan')
return False
return True
def check_project(project):
"""
Method to check if the user has access to the project on XNAT
:param project: project to send scans to
:return: True if project exists on XNAT and is accessable
"""
LOGGER.info('Checking ' + project.label() + ' project exists on XNAT')
if not project.exists():
raise Exception('ERROR: Project ' + str(project) + ' NOT found in XNAT')
else:
LOGGER.info('Project ' + project.label() + ' found!')
def check_subject(subject):
"""
Method to check if the user has access to the subject on XNAT
:param subject: subject to send scans to
:return: True if subject exists on XNAT and is accessable
"""
LOGGER.info('Checking ' + OPTIONS.subject + ' subject exists on XNAT')
if not subject.exists():
raise Exception('ERROR: Subject ' + OPTIONS.subject + ' NOT found in XNAT')
else:
LOGGER.info('Subject ' + OPTIONS.subject + ' found!')
def check_session(session):
"""
Method to check if the user has access to the session on XNAT
:param session: session to send scans to
:return: True if session exists on XNAT and is accessable
"""
LOGGER.info('Checking ' + OPTIONS.session + ' session exists on XNAT')
if not session.exists():
raise Exception('ERROR: Session ' + OPTIONS.session + ' NOT found in XNAT')
else:
LOGGER.info('Session ' + OPTIONS.session + ' found!')
def repush(scans):
df = pd.DataFrame(scans)
df = df[['subject_label','session_label','scan_type','scan_label']]
LOGGER.info('*******************************************')
if OPTIONS.session:
SESSION = OPTIONS.session
else:
SESSION = session['session_label']
LOGGER.info(SESSION)
patient_id = SESSION.split('_')
request = "http://10.109.20.19:8080/dcm4chee-arc/aets/DCM4CHEE57/rs/studies/?PatientID=*{}".format(patient_id[1])
res = requests.get(request)
response = str(res.content).split('},')
url = response[8].split('["')
study_url = url[1].split('"]')
study_url = study_url[0] + '/series'
res = requests.get(study_url)
scan_id = 9876543210
flag = 0
url_save = ''
for x in res:
if '00081190' in str(x):
x = str(x).split('["')
url = x[1].replace('"]','')
elif '00200011' in str(x):
x = str(x).split('["')
scan_id = str(x).split(':[')
scan_id = scan_id[1].replace("]']",'')
scan_id = scan_id.split(']}')
print(scan_id[0])
print('----------------------')
print(str(url))
print(scan_id)
print(flag)
if scan_id == '0' and flag == 0:
url_save = str(url) + '/{}/instances?includefield=all'
print('000000000000000000000000000')
print(url_save)
scan_id_save = scan_id
flag = 1
# print('THIS IS WHERE WE REPUSH WHATEVER SCAN')
def main_display():
"""
Main display of the executables before any process
:return: None
"""
print('################################################################')
print('# XnatRepushScan #')
print('# #')
print('# Developed by the MASI Lab Vanderbilt University, TN, USA. #')
print('# If issues, please start a thread here: #')
print('# https://groups.google.com/forum/#!forum/vuiis-cci #')
print('# Usage: #')
print('# Repush project scans from PACS that are missing on XNAT #')
print('# #')
print('# Parameters : #')
if vars(OPTIONS) == DEFAULT_ARGUMENTS:
print('# No Arguments given #')
print('# See the help below or Use "XnatRepushScan -h" #')
print('################################################################\n')
PARSER.print_help()
sys.exit()
else:
if OPTIONS.host:
print('# %*s -> %*s#' % (
-20, 'XNAT Host', -33, get_proper_str(OPTIONS.host)))
if OPTIONS.username:
print('# %*s -> %*s#' % (
-20, 'XNAT User', -33, get_proper_str(OPTIONS.username)))
if OPTIONS.project:
print('# %*s -> %*s#' % (
-20, 'Project ', -33, get_proper_str(OPTIONS.project)))
if OPTIONS.subject:
print('# %*s -> %*s#' % (
-20, 'Subject Type ', -33, get_proper_str(OPTIONS.subject)))
if OPTIONS.session:
print('# %*s -> %*s#' % (
-20, 'Session Type ', -33, get_proper_str(OPTIONS.session)))
if OPTIONS.scan:
print('# %*s -> %*s#' % (
-20, 'Scan Type ', -33, get_proper_str(OPTIONS.scan)))
print('################################################################')
def get_proper_str(str_option, end=False):
"""
Method to shorten a string into the proper size for display
:param str_option: string to shorten
:param end: keep the end of the string visible (default beginning)
:return: shortened string
"""
if len(str_option) > 32:
if end:
return '...' + str_option[-29:]
else:
return str_option[:29] + '...'
else:
return str_option
if __name__ == '__main__':
LOGGER = setup_info_logger('XnatRepushScan')
PARSER = parse_args()
OPTIONS = PARSER.parse_args()
main_display()
SHOULD_RUN = check_options()
if SHOULD_RUN:
if OPTIONS.host:
host = OPTIONS.host
else:
host = os.environ['XNAT_HOST']
user = OPTIONS.username
with XnatUtils.get_interface(host=host, user=user) as XNAT:
repush_proj = XNAT.select.project(OPTIONS.project)
check_project(repush_proj)
if OPTIONS.subject:
repush_subj = XNAT.select_subject(OPTIONS.project,OPTIONS.subject)
check_subject(repush_subj)
if OPTIONS.session:
repush_sess = XNAT.select_session(OPTIONS.project,OPTIONS.subject,OPTIONS.session)
check_session(repush_sess)
else:
repush_sess = None
else:
repush_subj = None
if repush_subj:
if repush_sess:
scans = XNAT.get_scans(OPTIONS.project, OPTIONS.subject, OPTIONS.session)
repush(scans)
else:
sessions = XNAT.get_sessions(OPTIONS.project, OPTIONS.subject)
for session in sessions:
scans = XNAT.get_scans(OPTIONS.project, OPTIONS.subject, session['label'])
repush(scans)
else:
subjects = XNAT.get_subjects(OPTIONS.project)
for subject in subjects:
sessions = XNAT.get_sessions(OPTIONS.project, subject['label'])
for session in sessions:
scans = XNAT.get_scans(OPTIONS.project, subject['label'], session['label'])
repush(scans)