Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests from gsa fork. Fix custom schema errors on Dep of Ed sources #2

Open
wants to merge 20 commits into
base: us-ed
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
build
ckanext_datajson.egg-info
.DS_Store
ckanext/datajson/export_map/*.map.json
ckanext/datajson/export_map/*.map.json
.vscode/
42 changes: 37 additions & 5 deletions ckanext/datajson/harvester_base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import re
from ckan.lib.base import c
from ckan import model
from ckan import plugins as p
from ckan.model import Session, Package
from ckan.logic import ValidationError, NotFound, get_action
from ckan.lib.munge import munge_title_to_name
from ckan.lib.munge import munge_title_to_name, munge_tag
from ckan.lib.search.index import PackageSearchIndex
from ckan.lib.navl.dictization_functions import Invalid
from ckan.lib.navl.validators import ignore_empty

from ckan.model import MAX_TAG_LENGTH, MIN_TAG_LENGTH
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
HarvestObjectError, HarvestObjectExtra
from ckanext.harvest.harvesters.base import HarvesterBase
Expand All @@ -22,13 +23,31 @@
from .helpers import reverse_accrual_periodicity_dict

import logging
log = logging.getLogger("harvester")
log = logging.getLogger(__name__)

VALIDATION_SCHEMA = [
('', 'Project Open Data (Federal)'),
('non-federal', 'Project Open Data (Non-Federal)'),
]


def clean_tags(tags):
ret = []
pattern = re.compile('[^A-Za-z0-9\s_\-!?]+')

for tag in tags:
tag = pattern.sub('', tag).strip()
if len(tag) > MAX_TAG_LENGTH:
log.error('tag is long, cutting: {}'.format(tag))
tag = tag[:MAX_TAG_LENGTH]
elif len(tag) < MIN_TAG_LENGTH:
log.error('tag is short: {}'.format(tag))
tag += '_' * (MIN_TAG_LENGTH - len(tag))
if tag != '':
ret.append(tag.lower().replace(' ', '-')) # copyin CKAN behaviour
return ret


def validate_schema(schema):
if schema not in [s[0] for s in VALIDATION_SCHEMA]:
raise Invalid('Unknown validation schema: {0}'.format(schema))
Expand Down Expand Up @@ -163,6 +182,7 @@ def gather_stage(self, harvest_job):
# Added: mark all existing parent datasets.
existing_datasets = { }
existing_parents = { }
log.info('Reading previously harvested packages from this source')
for hobj in model.Session.query(HarvestObject).filter_by(source=harvest_job.source, current=True):
try:
pkg = get_action('package_show')(self.context(), { "id": hobj.package_id })
Expand All @@ -172,7 +192,10 @@ def gather_stage(self, harvest_job):
sid = self.find_extra(pkg, "identifier")
is_parent = self.find_extra(pkg, "collection_metadata")
if sid:
log.info('Identifier: {} (ID:{})'.format(sid, pkg['id']))
existing_datasets[sid] = pkg
else:
log.info('The dataset has no identifier:{}'.format(pkg))
if is_parent and pkg.get("state") == "active":
existing_parents[sid] = pkg

Expand Down Expand Up @@ -293,9 +316,11 @@ def gather_stage(self, harvest_job):
and dataset['identifier'] not in existing_parents_demoted \
and dataset['identifier'] not in existing_datasets_promoted \
and self.find_extra(pkg, "source_hash") == self.make_upstream_content_hash(dataset, harvest_job.source, catalog_extras, schema_version):
log.info('Package {} don\'t need update. Leave'.format(pkg['id']))
continue
else:
pkg_id = uuid.uuid4().hex
log.info('Package (identifier:{}) is new, it will be created as {}'.format(dataset['identifier'], pkg_id))

# Create a new HarvestObject and store in it the GUID of the
# existing dataset (if it exists here already) and the dataset's
Expand Down Expand Up @@ -484,7 +509,8 @@ def import_stage(self, harvest_object):
"modified": "modified", # ! revision_timestamp
"publisher": {"name": "publisher"}, # !owner_org
"contactPoint": {"fn":"contact_name", "hasEmail":"contact_email"},
"identifier": "unique_id", # !id
# for USMetadata "identifier": "unique_id", # !id
"identifier": "extras__identifier",
"accessLevel": "public_access_level",

"bureauCode": "bureau_code[]",
Expand Down Expand Up @@ -651,7 +677,11 @@ def import_stage(self, harvest_object):

# fix for tag_string
if 'tags' in pkg:
pkg['tag_string'] = ''
tags = pkg['tags']
log.info('Tags: {}'.format(tags))
cleaned_tags = clean_tags(tags)
tag_string = ', '.join(cleaned_tags)
pkg['tag_string'] = tag_string

# pick a fix number of unmapped entries and put into extra
if unmapped:
Expand Down Expand Up @@ -701,6 +731,7 @@ def import_stage(self, harvest_object):

log.warn('updating package %s (%s) from %s' % (pkg["name"], pkg["id"], harvest_object.source.url))
pkg = get_action('package_update')(self.context(), pkg)
log.info('Package updated {}'.format(pkg))
else:
# It doesn't exist yet. Create a new one.
pkg['name'] = self.make_package_name(dataset_processed["title"], harvest_object.guid)
Expand All @@ -717,6 +748,7 @@ def import_stage(self, harvest_object):
except:
log.error('failed to create package %s from %s' % (pkg["name"], harvest_object.source.url))
raise
log.info('Package created {}'.format(pkg))

# Flag the other HarvestObjects linking to this package as not current anymore
for ob in model.Session.query(HarvestObject).filter_by(package_id=pkg["id"]):
Expand Down
55 changes: 44 additions & 11 deletions ckanext/datajson/harvester_datajson.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from ckanext.datajson.harvester_base import DatasetHarvesterBase
from parse_datajson import parse_datajson_entry
from parse_dep_of_ed import parse_datajson_entry_for_dep_of_ed_schema
import logging
log = logging.getLogger(__name__)


import urllib2, json, ssl
Expand All @@ -19,20 +22,48 @@ def info(self):
}

def load_remote_catalog(self, harvest_job):
req = urllib2.Request(harvest_job.source.url)
url = harvest_job.source.url
log.info('Loading catalog from URL: {}'.format(url))
req = urllib2.Request(url)
# todo: into config and across harvester
req.add_header('User-agent', 'Data.gov/2.0')

try:
datasets = json.load(urllib2.urlopen(req))
conn = urllib2.urlopen(req)
except Exception, e:
log.error('Failed to connect to {}: {} ({})'.format(url, e, type(e)))
# try to avoid SSL errors
try:
conn = urllib2.urlopen(req, context=ssl._create_unverified_context())
except Exception as e:
log.error('Failed (SSL) to connect to {}: {} ({})'.format(url, e, type(e)))
raise

data_readed = conn.read()
# remove BOM_UTF8 if exists
clean_data_readed, bom_removed = lstrip_bom(data_readed)
if bom_removed:
log.info('BOM_UTF8 removed from URL: {}'.format(url))

try:
datasets = json.loads(clean_data_readed)
except UnicodeDecodeError:
# try different encode
try:
datasets = json.load(urllib2.urlopen(req), 'cp1252')
except:
datasets = json.load(urllib2.urlopen(req), 'iso-8859-1')
except:
# remove BOM
datasets = json.loads(lstrip_bom(urllib2.urlopen(req, context=ssl._create_unverified_context()).read()))
log.error('Unicode Error at {}'.format(url))
charsets = ['cp1252', 'iso-8859-1']
datasets = None
for charset in charsets:
try:
data_decoded = clean_data_readed.decode(charset)
datasets = json.loads(data_decoded)
log.info('Charset detected {} for {}'.format(charset, url))
break
except:
log.error('Failed to load URL {} with {} charset'.format(url, charset))

if datasets is None:
raise ValueError('Unable to decode data from {}. Charsets: utf8, {}'.format(url, charsets))


# The first dataset should be for the data.json file itself. Check that
# it is, and if so rewrite the dataset's title because Socrata exports
Expand All @@ -50,16 +81,18 @@ def load_remote_catalog(self, harvest_job):
catalog_values = datasets.copy()
datasets = catalog_values.pop("dataset", [])

log.info('Catalog Loaded from URL: {}: {} datasets found'.format(url, len(datasets)))
return (datasets, catalog_values)

def set_dataset_info(self, pkg, dataset, dataset_defaults, schema_version):
parse_datajson_entry(dataset, pkg, dataset_defaults, schema_version)
parse_datajson_entry_for_dep_of_ed_schema(dataset, pkg, dataset_defaults, schema_version)

# helper function to remove BOM
def lstrip_bom(str_):
from codecs import BOM_UTF8
bom = BOM_UTF8
if str_.startswith(bom):
return str_[len(bom):]
return str_[len(bom):], True
else:
return str_
return str_, False
10 changes: 8 additions & 2 deletions ckanext/datajson/parse_datajson.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from ckan.lib.munge import munge_title_to_name

import logging
import re


log = logging.getLogger(__name__)


def parse_datajson_entry(datajson, package, defaults, schema_version):
# four fields need extra handling, which are
# 1.tag, 2.license, 3.maintainer_email, 4.publisher_hierarchy,
# 5.resources

log.info('Parsing datajson entry: {}'.format(package))
# 1. package["tags"]
package["tags"] = [ { "name": munge_title_to_name(t) } for t in
package.get("tags", "") if t.strip() != ""]
Expand Down Expand Up @@ -142,6 +146,8 @@ def parse_datajson_entry(datajson, package, defaults, schema_version):
r['accessURL'] = accessurl_value

package["resources"].append(r)

log.info('Finished Parsing datajson entry: {}'.format(package))

def extra(package, key, value):
if not value: return
Expand Down
32 changes: 32 additions & 0 deletions ckanext/datajson/parse_dep_of_ed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Temporal fixes to fit the Dep of Ed schema
"""
import logging
import ckan.model as model
log = logging.getLogger(__name__)


def parse_datajson_entry_for_dep_of_ed_schema(datajson, package, defaults, schema_version):
# temporal FIX
log.info('Parsing datajson entry for dep of ed : {}'.format(package))

is_private = package.get('private', False)
package['private'] = is_private

if schema_version == '1.1':
author_email = package.get('contact_email', '[email protected]')
author = package.get('contact_name', 'Unknown author')
elif schema_version == '1.0':
author_email = package.get('maintainer_email', '[email protected]')
author = package.get('maintainer', 'Unknown author')

package['author'] = author
package['author_email'] = author_email

# require vocabularies created !
# paster --plugin=ckanext-ed ed create_ed_vocabularies

spatial = package.get('spatial', 'United States')
package['spatial'] = spatial

log.info('Finished Parsing datajson entry for dep of ed : {}'.format(package))
Loading