#! /bin/env python """Solar Orbiter EDDS SOAP Web services library - launch automated batch requests on EDDS server and download TM/TC result files - launch automated stream request and run Java StreamClient to download streamed data - launch Java StreamClient to recover data from on existing data stream request """ import sys import os import time import isodate import zeep import subprocess from configobj import ConfigObj import platform min_python_version = (3,0,0) cur_python_version = sys.version_info [:3] if cur_python_version < min_python_version: print ("ERROR : python version %d.%d.%d < %d.%d.%d" % (cur_python_version + min_python_version)) exit (100) ## Stream client config file # CONFIG_FILE = "config.properties" STREAM_CLIENT = "StreamClientLauncher.sh" ## Delay to get batch response files # BATCH_DELAY = 5 ## Default request duration # DEFAULT_DURATION = isodate.Duration (days = 1) def now(): """ Return current time string """ return "[%s]" % time.strftime ("%T") def load_properties (): """ Read config file """ try: properties = ConfigObj (CONFIG_FILE, file_error = True) except OSError as err: print ("ERROR :", err) sys.exit(100) print ("Reading config from", CONFIG_FILE) return properties def update_properties (job_id): """ Update config file to set requestId = job_id """ properties ["requestId"] = job_id properties.write() print ("Update %s with requestId = %s" % (CONFIG_FILE, properties ["requestId"])) properties = load_properties () WSDL = properties ["eddsServerAddress"] ## Read wsdl to discover web-services # client = zeep.Client (WSDL) ## Dynamically create some data types to create various requests # User = client.get_type ("ns0:User") ContextPart = client.get_type ("ns0:ContextPart") DataRequest = client.get_type ("ns0:DataRequest") StreamRequest = client.get_type ("ns0:StreamRequest") BatchRequest = client.get_type ("ns0:BatchRequest") CancelPart = client.get_type ("ns0:CancelPart") JobIdPart = client.get_type ("ns0:JobIdPart") TimeRange = client.get_type ("ns0:TimeRange") ## Authentication and context parameters # USERNAME = properties ["username"] _PASSWORD = properties ["password"] ROLE = "SWA_Role" USER = User (UserName = USERNAME, Role = ROLE) CONTEXT_STREAM = ContextPart (MissionId = "SOL", DomainId = "0") CONTEXT_BATCH = ContextPart (MissionId = "SOL", DomainId = "0", PrivacyTag = "PRIVATE") ## Request status : wait for results available # STATE_WAIT = [ "ACTIVE", "SUBMITTED", "QUEUED" , "SERVER_COMPLETED" ] ## Request status : we can kill, no results available or error # STATE_KILL = [ "CANCELED", "SUSPENDED", "COMPLETED_NO_RESULTS", "ERROR_LOCALLY_DELIVERED", "ERROR_ACCESS_DENIED", "ERROR_INVALID_REQUEST", "ERROR_LIMIT_EXCEEDED", "ERROR_SERVER_NOT_AVAILABLE", "ERROR_DELIVERY", "ERROR_UNKNOWN" ] ## Request status : results available # STATE_DONE = [ "DELIVERED", "DELIVERED_RESP_DELETED", "DELIVERED_PARTIAL_RESULTS" ] def login (): """ Login to EDDS server """ status = client.service.logIn (UserName = USERNAME, Password = _PASSWORD) print ("Login EDDS", status ["SessionId"]) def make_output_filename (dataset, start, duration, extension): """ Computes output filenames >> start = isodate.parse_datetime ("2020-04-04T10:00:00") >> duration = isodate.parse_duration ("PT8H") # set in config.properties outbox = /home/solar/EDDS/output >> make_output ("swa-batch-tm", start, duration, "bin") /home/solar/EDDS/output/solo_L0_swa-batch-tm_20200402_000000_20200402_080000_V00.bin """ node_name = platform.uname().node prefix = "solo_L0" stop = start + duration format = "%Y%m%d_%H%M%S" s1, s2 = (dt.strftime (format) for dt in (start, stop)) if node_name == "rosina1.irap.omp.eu": """ Specific output file naming at IRAP """ directory = "" pattern = "{s1}_{s2}.{dataset}.{extension}" else: directory = properties ["outbox"] pattern = "{prefix}_{dataset}_{s1}_{s2}_V00.{extension}" basename = pattern.format (** locals()) return os.path.join (directory, basename) def log_req (job_id, req): """ Save request as .xml file """ if "StreamDataRequest" in req: message = client.create_message (client.service, "streamRequest", ContextPart = CONTEXT_STREAM, StreamRequest = req) if "DataRequest" in req: message = client.create_message (client.service, "batchRequest", ContextPart = CONTEXT_BATCH, BatchRequest = req) tree = message.getroottree() output_file = job_id + ".xml" tree.write (file = output_file, pretty_print = True) print ("Save request file :", output_file) def schedule_with_expiry (duration): """ Schedule for stream requests : immediate with given duration before expirying duration : isodate.Duration() """ ScheduleWithExpiry = client.get_type ("ns0:ScheduleWithExpiry") OnceSchedule = client.get_type ("ns0:OnceSchedule") s = ScheduleWithExpiry ( Immediate = "true", RequestExpiryDate = OnceSchedule (RelativeTime = duration), ) return s def create_stream_TM_request (duration, include_raw_data = True, **filters): """ Create a StreamRequest to retrieve PktTmStream data, for a given duration duration: isodate interval filtering TM is possible: pids="95-99" all SWA TM data apids="1520:1599" all SWA TM data Automatically adding "sendRawData=true" to get packet TM content >> req = create_stream_TM_request ( isodate.parse_duration ("P2W"), True, pids = "95-99") """ PktTmStream = client.get_type ("ns0:PktTmStream") FILTER_KEYS = ["pids", "notpids", "apids", "notapids", "spids", "notspids"] for key in filters: if key not in FILTER_KEYS: print ("ERROR : filter key %s not allowed" % key) return None if include_raw_data: filters ["sendRawData"] = "true" filter_string = ";".join ("%s=%s" % k for k in filters.items()) req = StreamRequest ( Comment = "PARC stream TM request", Schedule = schedule_with_expiry (duration), User = USER, StreamDataRequest = PktTmStream ( DataSource = "PARC", Filter = filter_string ) ) return req def create_stream_TC_request (duration): """ Create a StreamRequest to retrieve PktTmStream data, for a given duration duration: isodate.Duration eg: isodate.Duration (hours = 1) CAUTION: filtering don't seem to work, and is not implemented in GUI >> create_stream_TC_request (isodate.parse_duration ("P1D")) >> create_stream_TC_request (isodate.Duration (months=1)) """ PktTcStream = client.get_type ("ns0:PktTcStream") req = StreamRequest ( Comment = "PARC stream TC request", Schedule = schedule_with_expiry (duration), User = USER, StreamDataRequest = PktTcStream ( DataSource = "PARC", Filter = "pids=95-99;sendRawData=true" ) ) return req def ack_delivery (): """ Create default AckDelivery() """ AckDelivery = client.get_type ("ns0:AckDelivery") EmailDelivery = client.get_type ("ns0:EmailDelivery") return AckDelivery (EmailDelivery = EmailDelivery (EmailList = {}, Default = True)) def post_processing (): """ Create default RequestPostProcessing() DataCompression = "ZIP|TAR|TARGZ" should be used """ RequestPostProcessing = client.get_type ("ns0:RequestPostProcessing") return RequestPostProcessing (DataCompression = "NONE", DataEncrypting = "NONE") def server_delivery (): """ Create default Delivery() """ Delivery = client.get_type ("ns0:Delivery") ServerDelivery = client.get_type ("ns0:ServerDelivery") return Delivery (ServerDelivery = ServerDelivery ()) def report_formatting (format): """ Create TC/TM report format format : "XML","RAWSOURCEBINARY" """ ReportFormatting = client.get_type ("ns0:ReportFormatting") return ReportFormatting (Format = format) def schedule_once_immediate (): """ Request schedule : once and immediate """ ScheduleOnce = client.get_type ("ns0:ScheduleOnce") return ScheduleOnce (Immediate = "true") def create_batch_Param_request (start, duration, params = []): """ Create a batch Param request for a list of Param identifiers >> create_batch_param ( isodate.parse_datetime ("2020-03-26T00:00"), isodate.parse_duration ("P1D"), params = ["NRU00001", "NRU00002"]) """ Param = client.get_type ("ns0:Param" ) ParamTmFilter = client.get_type ("ns0:ParamTmFilter") ParamTmFormatting = client.get_type ("ns0:ParamTmFormatting") ParamNameList = client.get_type ("ns0:ParamNameList") DeliveryRange = client.get_type ("ns0:DeliveryRange") delivery_range = DeliveryRange() setattr (delivery_range, "None", "None") req = BatchRequest ( Comment = "Param batch request", Schedule = schedule_once_immediate(), User = USER, DataRequest = DataRequest ( Param = Param ( DataSource = "DARC", RequestPostProcessing = post_processing(), Delivery = server_delivery(), ParamTmFilter = ParamTmFilter ( ParamNameList = ParamNameList (params), TimeRange = TimeRange (StartTime = start, Duration = duration), DeliveryRange = delivery_range, OnChange = False, SuperCommutationFlag = False, RepresentationSelection = "ENG" ), ParamTmFormat = ParamTmFormatting (Format = "XML"), ) ), AckDelivery = ack_delivery() ) return req def create_batch_TM_report_request (start, duration, apids = (1520, 1599)): """ Create a TM report batch request for a given start time and duration start : isodate.datetime duration : isodate.Duration apids : tuple (first_apid, last_apid) >> create_batch_TM_report_request ( isodate.parse_datetime ("2020-03-26T00:00"), isodate.parse_duration ("P1D"), apids = (1520, 1599)) """ PktTmReport = client.get_type ("ns0:PktTmReport") TmPacketFilter = client.get_type ("ns0:TmPacketFilter") PacketNameList = client.get_type ("ns0:PacketNameList") PacketName = client.get_type ("ns0:PacketName") req = BatchRequest( Comment = "TM report batch request", Schedule = schedule_once_immediate(), User = USER, DataRequest = DataRequest ( PktTmReport = PktTmReport ( DataSource = "PARC", RequestPostProcessing = post_processing(), Delivery = server_delivery(), ReportFormat = report_formatting ("RAWSOURCEBINARY"), TmPacketFilter = TmPacketFilter ( TimeRange = TimeRange (StartTime = start, Duration = duration), PacketNameList = PacketNameList ( PacketNameListElement = PacketName (Apid = "%d-%d" % apids), ), TimeFiltering = "GENERATION_TIME", Dataspace = "PARC_FLIGHT_SOL", BriefReportFlag = True, IncludeRawData = True, ), ), ), AckDelivery = ack_delivery() ) return req def TC_filter_list (pids): """ Filter TC for Pid in a given range pids : tuple (pid_min, pid_max) => Filtering with (Pid >= pid_min) and (Pid <= pid_max) """ TcFilterList = client.get_type ("ns0:TcFilterList") TcFilterElement = client.get_type ("ns0:TcFilterElement") PktTcFilterKeyword = client.get_type ("ns0:PktTcFilterKeyword") Operation = client.get_type ("ns0:Operation") pid_min, pid_max = pids filters = [ TcFilterElement (PktTcFilterKeyword (Pid = pid_min), Operation ("OP_GTE")), TcFilterElement (PktTcFilterKeyword (Pid = pid_max), Operation ("OP_LTE")) ] return TcFilterList (filters) def create_batch_TC_report_request (start, duration, pids = (95, 99)): """ Create a TC report batch request for a given start time and duration start : isodate.datetime duration : isodate.Duration pids : PIDs tuple (first, last) CAUTION: format RAWSOURCEBINARY not available for TC => EDDS server error : not implemented NOTE: TimeFiltering should be: - RELEASE_TIME : time when TC is send from ground - EXECUTION_TIME : time when tC is executed A command should be sent from ground a given D day, and executed at D+1, or D+2 >> create_batch_TC_report_request ( isodate.datetime (2002, 3, 20, 12, 00, 00), isodate.Duration (days = 1), pids = (95, 99)) """ PktTcReport = client.get_type ("ns0:PktTcReport") TcPacketFilter = client.get_type ("ns0:TcPacketFilter") req = BatchRequest ( Comment = "TC report batch request", Schedule = schedule_once_immediate(), User = USER, DataRequest = DataRequest ( PktTcReport = PktTcReport ( DataSource = "PARC", RequestPostProcessing = post_processing(), Delivery = server_delivery(), ReportFormat = report_formatting ("XML"), TcPacketFilter = TcPacketFilter ( TimeRange = TimeRange (StartTime = start, Duration = duration), TcFilterList = TC_filter_list (pids), #TimeFiltering = "RELEASE_TIME", TimeFiltering = "EXECUTION_TIME", Dataspace = "PARC_FLIGHT_SOL", BriefReportFlag = True, IncludeRawData = True, ), ), ), AckDelivery = ack_delivery() ) return req def send_stream_request (req): """ Send a stream request to EDDS server Return a job_id """ job = client.service.streamRequest (ContextPart = CONTEXT_STREAM, StreamRequest = req) print ("send_stream_request :", job) log_req (job, req) return job def send_batch_request (req): """ Send a batch request to EDDS server Return job_id """ job = client.service.batchRequest (ContextPart = CONTEXT_BATCH, BatchRequest = req) [0] print ("Send batch request :", job) log_req (job, req) return job def cancel_request (job_id): """ Cancel request on EDDS server """ req = CancelPart (User = USER, JobIdPart = JobIdPart (Job = job_id)) print ("CANCEL_REQUEST", req) client.service.cancel (req) def get_status (job): """ get request status, and loop until error or results available return status """ while True: resp = client.service.getStatus (Job = job) status = resp ["State"] ratio = resp ["Percentage"] or 0.0 sizes = [rf ["Size"] for rf in resp ["ResponseFiles"]] mb = sum (sizes) / (1e6) print (now(), "Status = %3.0f%% %7.3f MB : %s" % (ratio, mb, status)) if status in STATE_KILL: print ("Error =", resp ["Error"]) break if status in STATE_DONE: break time.sleep (BATCH_DELAY) return status def get_response (job, output_file): """ Get request response and save to output file """ print ("Getting response file(s)...") while True: try: resp = client.service.getResponse (Job = job) except zeep.exceptions.Fault as exc: print ("ERROR :", exc.message) if "No files found for request" in exc.message: time.sleep (BATCH_DELAY) else: raise exc else: break print (now(), "Creation", output_file, "...") print (len(resp), "chunks") with open (output_file, "wb") as output: for chunk in resp: print (chunk ["FileName"]) output.write (chunk["Response"]) print (now(), "Done") def run_stream_client (job_id): """ Run Java StreamClient, to get data from an existing request job_id Update requestId parameter in config.properties """ update_properties (job_id) command = "sh %s" % STREAM_CLIENT subprocess.call (command, shell = True) def run_stream_TM (duration = DEFAULT_DURATION): """ Create a PktTmStream request, with a given duration Filtering PID in range [95,99] => whole SWA TM """ req = create_stream_TM_request (duration, pids = "95-99") job_id = send_stream_request (req) run_stream_client (job_id) def run_stream_TC (duration = DEFAULT_DURATION): """ Create a PktTcStream request, with a given duration """ req = create_stream_TC_request (duration) job_id = send_stream_request (req) run_stream_client (job_id) def run_batch_TM_report (start, duration = DEFAULT_DURATION): """ Create a batch TM report request for a given datetime an duration Filtering TM with APID in range [1520, 1599] => whole SWA TM """ req = create_batch_TM_report_request (start, duration, apids = (1520,1599)) job = send_batch_request (req) status = get_status (job) if status in STATE_DONE: ymd = start.strftime ("%Y%m%d%H%M%S") output_file = make_output_filename ("swa-batch-tm", start, duration, "bin") get_response (job, output_file) else: print ("Job aborted :", job) def run_batch_TC_report (start, duration = DEFAULT_DURATION): """ Create a batch TC report request for a given datetime an duration Filtering TC with PID in range [95, 99] => whole SWA TC """ req = create_batch_TC_report_request (start, duration, pids = (95,99)) job = send_batch_request (req) status = get_status (job) if status in STATE_DONE: ymd = start.strftime ("%Y%m%d%H%M%S") output_file = make_output_filename ("swa-batch-tc", start, duration, "xml") get_response (job, output_file) else: print ("Job aborted :", job) def run_batch_param (start, duration = DEFAULT_DURATION, param = []): """ Create a batch Param request for a given datetime an duration """ if len (param) == 0: param = "NRU02395 NRU02687 NRU02615 NRU02626 NRU02611 NRU02406 NRU02684 NRU02602 NRU02389 NRU02686 NRU02612".split() req = create_batch_Param_request (start, duration, param) job = send_batch_request (req) status = get_status (job) if status in STATE_DONE: ymd = start.strftime ("%Y%m%d%H%M%S") output_file = make_output_filename ("swa-batch-param", start, duration, "xml") get_response (job, output_file) else: print ("Job aborted :", job) def check_stream_status (): """ Get status of active stream request. requestId found in config.properties """ job = properties ["requestId"] print ("Check status for job :", job) resp = client.service.getStatus (job) keywords = "RequestId State Error Reason SubmissionTime ExpiryTime".split () for key in keywords: print ("%20s : %s" % (key, resp[key])) def run_test (args): """ Make here your own tests. args is a list of command line argument strings """ print ("args = ", args) job = args [0] output_file = "output.bin" get_response (job, output_file) return start = isodate.parse_datetime (args [0]) duration = isodate.parse_duration (args [1]) dataset = "swa-batch-tm" ext = "bin" filename = make_output_filename (dataset, start, duration, ext) print ("filename =", filename) if __name__ == "__main__": """ Parse command line arguments and launch corresponding EDDS requests """ import argparse parser = argparse.ArgumentParser() duration_parser = argparse.ArgumentParser (add_help = False) duration_parser.add_argument ("duration", help = "Request duration: IS08601 duration string (P1D, PT8H)", type = isodate.parse_duration, default = DEFAULT_DURATION) datetime_parser = argparse.ArgumentParser (add_help = False) datetime_parser.add_argument ("start", help = "Start time: ISO8601 datetime string (2020-03-17T00, 2020-03-17T12:00:00)", type = isodate.parse_datetime) ## Add a sub parser for each kind of reques # sub_parser = parser.add_subparsers (dest = "req", help = "sub-command help") ## Stream request arguments # stream_TM_parser = sub_parser.add_parser ("stream_TM", parents = [duration_parser], help = "Stream TM request") stream_TC_parser = sub_parser.add_parser ("stream_TC", parents = [duration_parser], help = "Stream TC request") ## Batch requests arguments # batch_TM_parser = sub_parser.add_parser ("batch_TM", parents = [datetime_parser, duration_parser], help = "Batch TM request") batch_TC_parser = sub_parser.add_parser ("batch_TC", parents = [datetime_parser, duration_parser], help = "Batch TC request") batch_param_parser = sub_parser.add_parser ("batch_param", parents = [datetime_parser, duration_parser], help = "Batch Param request") batch_param_parser.add_argument ("param", nargs = "*", help = "List of Parameters name") ## Java client arguments # client_parser = sub_parser.add_parser ("client", help = "Java stream client") client_parser.add_argument ("job_id", help = "Launch Java stream client on existing stream request Job Id: string") ## stream status parser # stream_status_parser = sub_parser.add_parser ("status", help = "Check current stream request status") ## Test function arguments (list of strings) # test_parser = sub_parser.add_parser ("test", help = "Run test function") test_parser.add_argument ("values", action = "append", nargs="*") ## Cancel request arguments # cancel_parser = sub_parser.add_parser ("cancel", help = "Cancel existing request") cancel_parser.add_argument ("job_id", help = "Cancel request Id") ## Map comand line parameters to processing functions # mapping = { "stream_TC": lambda args: run_stream_TC (args.duration), "stream_TM": lambda args: run_stream_TM (args.duration), "status": lambda args: check_stream_status (), "batch_TC": lambda args: run_batch_TC_report (args.start, args.duration), "batch_TM": lambda args: run_batch_TM_report (args.start, args.duration), "batch_param": lambda args: run_batch_param (args.start, args.duration, args.param), "client": lambda args: run_stream_client (args.job_id), "test": lambda args: run_test (* args.values), "cancel": lambda args: cancel_request (args.job_id) } args = parser.parse_args() try: login() mapping [args.req](args) except KeyError: pass print ("Good bye...")