Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPP_PROCESSOR_PATH does not work with flow #183

Open
3 tasks
cschloer opened this issue Mar 25, 2020 · 0 comments
Open
3 tasks

DPP_PROCESSOR_PATH does not work with flow #183

cschloer opened this issue Mar 25, 2020 · 0 comments

Comments

@cschloer
Copy link
Contributor

cschloer commented Mar 25, 2020

Overview

Python 3.7.5, Ubuntu

If you try to use the flow key instead of run, custom flows saved under a path stored in DPP_PROCESSOR_PATH are not properly loaded. Running simply dpp correctly shows the pipeline as being valid, but when you actually run it you get:

 ModuleNotFoundError: No module named 'bcodmo_pipeline_processors'

recreate:

DPP_PROCESSOR_PATH=~/path/to/processors/

pipeline-spec.yaml:

test:
  title: test
  description: this is a test 
  pipeline:
    - flow: bcodmo_pipeline_processors.load
      parameters:
        format: csv
        from: /path/to/load/from
        name: res

where a load flow is stored under:

/path/to/processors/bcodmo_pipeline_processors/load.py

and the load flow looks like:

from dataflows import Flow, load
from datapackage_pipelines.utilities.resources import PROP_STREAMING, PROP_STREAMED_FROM


def flow(parameters, datapackage, resources, stats):

  def count_resources():
        def func(package):
            global num_resources
            num_resources = len(package.pkg.resources)
            yield package.pkg
            yield from package
        return func

    def mark_streaming(_from):
        def func(package):
            for i in range(num_resources, len(package.pkg.resources)):
                package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMING, True)
                package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMED_FROM,  _from)
            yield package.pkg
            yield from package
        return func

   _from = parameters.pop('from')
    return Flow(
        count_resources(),
        load(_from, parameters),
        mark_streaming(_from),
    )

Tasks

  • review
  • fix if there is a quick fix OR
  • remove from the BCO-DMO as it's not needed anymore
@roll roll added wip and removed wip labels Apr 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants