Source code for task_geo.data_sources.noaa.ftp_connector

"""Extract and prepare meteorological data from https://www.ncdc.noaa.gov/
the National Centers for Environmental Information

Credits for the data:

Menne, M.J., I. Durre, B. Korzeniewski, S. McNeal, K. Thomas, X. Yin, S. Anthony, R. Ray,
R.S. Vose, B.E.Gleason, and T.G. Houston, 2012: Global Historical Climatology Network -
Daily (GHCN-Daily), Version 3. [indicate subset used following decimal,
e.g. Version 3.12].
NOAA National Climatic Data Center. http://doi.org/10.7289/V5D21VHZ [2020].
"""

import logging
import os
import re
import tarfile as tar
from datetime import datetime
from ftplib import FTP

import pandas as pd

from task_geo.data_sources.noaa.references import DATA_DIRECTORY, load_dataset

logging.basicConfig(level=logging.DEBUG)


NOAA_FTP_FILES = {
    'readme.txt': 'RETR readme.txt',
    'stations_metadata.txt': 'RETR ghcnd-stations.txt',
    'country_codes.txt': 'RETR ghcnd-countries.txt',
    'stations_inventory.txt': 'RETR ghcnd-inventory.txt',
    'all_daily_data.tar.gz': 'RETR ghcnd_all.tar.gz'
}

INDEX_COLUMNS = [
    'ID',
    'YEAR',
    'MONTH',
    'ELEMENT'
]
REGEX = re.compile(r'^(\w*?)(\d+)$')


[docs]def download_noaa_files(large_files=True, skip_downloaded=False): """Download files from the NOAA FTP server. Arguments: large_files(bool): Wheter or not to download the 3Gb daily reports, only download reference data. skip_downloaded(bool): Check if the file exists on local and has the same size that in the server, if True, will be skiped, if False will download it. Returns: None. The files will be downloaded on DOWNLOADED_DIRECTORY. """ logging.info('Connecting to NOAA FTP server.') ftp = FTP('ftp.ncdc.noaa.gov') ftp.login() ftp.cwd('/pub/data/ghcn/daily/') for filename, action in NOAA_FTP_FILES.items(): if filename.endswith('tar.gz'): if not large_files: continue logging.debug('This is file is more than 3Gb+, it may take a long time.') logging.debug('Downloading %s', filename) path = os.path.join(DATA_DIRECTORY, filename) if not os.path.exists(path) and not os.path.isdir(DATA_DIRECTORY): os.mkdir(DATA_DIRECTORY) if skip_downloaded: server_file_name = action.split(' ')[1] if os.path.exists(path) and ftp.size(server_file_name) == os.stat(path).st_size: continue with open(path, 'wb') as fp: ftp.retrbinary(action, fp.write) if large_files: logging.debug('Extracting daily data.') tar_path = f'{DATA_DIRECTORY}/all_daily_data.tar.gz' with tar.open(tar_path) as tar_all: tar_all.extractall(path=f'{DATA_DIRECTORY}/all_daily/') logging.debug('Done!')
[docs]def to_date(row): try: return datetime(row['YEAR'], row['MONTH'], row['DAY']) except ValueError: return None
[docs]def load_and_filter_dataset(dataset_name): dataset = load_dataset(dataset_name) return dataset[(dataset.YEAR >= 2020) | ((dataset.YEAR == 2019) & (dataset.MONTH >= 11))]
[docs]def load_stations_data(station_ids): stations_data = pd.concat([ load_and_filter_dataset(station_id) for station_id in station_ids ]).set_index(INDEX_COLUMNS) stacked = stations_data.stack().reset_index(level=-1) stacked.columns = ['name', 'value'] column_day = stacked.name.str.extract(REGEX) stacked['column'] = column_day[0] stacked['DAY'] = column_day[1].astype(int) del stacked['name'] values = stacked.set_index(['DAY', 'column'], append=True) unstacked = values.unstack(level=-1) unstacked.columns = unstacked.columns.droplevel(0) unstacked.reset_index(inplace=True) unstacked.columns = list(unstacked.columns) unstacked['DATE'] = unstacked.apply(to_date, axis=1) unstacked.drop(labels=['YEAR', 'MONTH', 'DAY'], axis=1, inplace=True) return unstacked.dropna(subset=['DATE'])
[docs]def process_noaa_files(countries): """Returns a dataset for the given countries. Arguments: countries(list[str]): List of countries in ISO-2 format. Returns: pandas.DataFrame """ df_stations = load_dataset('stations') df_countries = load_dataset('countries') # Join country to df_stations df_stations['COUNTRY_CODE'] = df_stations['ID'].str.slice(0, 2) df_stations = df_stations.merge(df_countries, on=['COUNTRY_CODE'], how='left') df_stations = df_stations[df_stations['COUNTRY_CODE'].isin(countries)] df_daily_information = load_stations_data(df_stations['ID']) return df_daily_information.merge(df_stations, how='left', on=['ID'])
[docs]def noaa_ftp_connector(countries, download=True): """Retrieves data from the NOAA FTP server. Arguments: countries(list[str]): List of countries in ISO-2 format. download(bool): Wheter or not to download the data, and just process previously downloaded data. """ if download: download_noaa_files() return process_noaa_files(countries)