"""Access to CSA with the new TAP protocol """ from __future__ import print_function import requests import lxml.etree as ET import datetime import time import os import re BASE_URL = "https://csa.esac.esa.int/csa-sl-tap" __user = "abarthe" __pswd = "pm2mdp2CSA!" DELAY = 3 session = requests.Session() def isotime (d): """ datetime => yyyy-mm-ddThh:mm:ssZ """ return d.strftime ("%FT%TZ") def login (): """ Login to CSA and create a cookie, embedded in session """ login_url = "%s/login" % BASE_URL print ("Login on", login_url) auth = dict (username = __user, password = __pswd) r = session.post (login_url, data = auth) if r.status_code != 200: print ("Error: status code =", r.status_code) def download_file (url): """ Download the resulting file (add .zip extension) """ r = session.get (url) output_file = os.path.basename (url) + ".zip" print ("OUTPUT =", output_file) with open (output_file, "w") as output: output.write (r.content) print ("Done...") def parse_xml_response (content): """ Parse XML response to extract (url, phase, href) """ root = ET.XML (content) url = root.xpath ("//uws:parameter[@id='email_base_url']/text()", namespaces = root.nsmap)[0] phase = root.xpath ("//uws:phase/text()", namespaces = root.nsmap)[0] if phase == "ERROR": print (content) if phase == "COMPLETED": href = root.xpath ("//uws:result/@xlink:href", namespaces = root.nsmap)[0] else: href = None return url, phase, href def get_async_data (dataset, start, end): """ Asynchronous data request using TAP protocol """ print ("REQUEST =", dataset, ":", isotime (start), "/", isotime (end)) async_url = "%s/data" % BASE_URL param = dict ( RETRIEVAL_TYPE = "product", RETRIEVAL_ACCESS = "DEFERRED", DATASET_ID = dataset, START_DATE = isotime (start), END_DATE = isotime (end) ) r = session.get (async_url, params = param) if not r.headers["Content-Type"].startswith ("text/xml"): print ("Unexpected content type :", r.headers["Content-Type"]) return url, phase, href = parse_xml_response (r.content) print ("URL =", url) print ("PHASE =", phase) while phase != "COMPLETED": time.sleep (DELAY) r = session.get (url) url, phase, href = parse_xml_response (r.content) print ("PHASE =", phase) print ("HREF =", href) download_file (href) def get_data (dataset, start, end): """ Synchronous data request using TAP protocol """ print ("REQUEST =", dataset, ":", isotime (start), "/", isotime (end)) url = "%s/data" % BASE_URL param = dict ( RETRIEVAL_TYPE = "product", DELIVERY_FORMAT = "CDF", DATASET_ID = dataset, START_DATE = isotime (start), END_DATE = isotime (end) ) print (url, param) r = session.get (url, params = param) expr = re.compile ("filename=\"([^\"]*)") s = expr.search (r.headers["Content-disposition"]) if s: filename = s.group(1) print ("Extracting", filename) with open (filename, "w") as output: output.write (r.content) print ("Done.") def get_stream_data (dataset, start, end): """ Download CEF data using streaming mode """ url = "%s/data" % BASE_URL param = dict ( RETRIEVAL_TYPE = "product", DATASET_ID = dataset, START_DATE = isotime (start), END_DATE = isotime (end), RETRIEVAL_ACCESS = "streamed" ) print (url, param) r = session.get (url, params = param) return r.content def test(): """ Example of use """ login() start = datetime.datetime (2020, 4, 14, 12, 0) stop = start + datetime.timedelta (hours = 3) dataset = "C4_CP_FGM_SPIN" print (get_stream_data (dataset, start, stop)) # print (get_data (dataset, start, stop)) if __name__ == "__main__": test()