Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regular expression matching on files #11

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ for these tools, but rather to be the glue that sticks them together.
simulation that takes a long time.


### examples

There are several [examples](examples/) for inspiration on how you
could use the workflow.yaml specification. If you have suggestions for
other ideas, please [add them](issues)!


### workflow.yaml specification

Individual analysis tasks are defined as
Expand Down Expand Up @@ -215,9 +222,27 @@ tasks:
command: python {{depends|join(' ')}} > {{creates}}
```

There are several [examples](examples/) for more inspiration on how
you could use the workflow.yaml specification. If you have suggestions
for other ideas, please [add them](issues)!
##### regular expression matching

When working with a large number of files, there are generally two
approaches. One approach is to write a script that analyzes all of the
files and another is to write a script that analyzes each file
individually. One of the core premises of data-workflow is to
facilitate rapid iterations which, in our opinion, is best done by
working on specific files first.

The challenge is that it is difficult to know, *a priori*, which files
are the best candidates for preliminary work. One way of addressing
this issue is by making it possible to write workflow tasks that have
flexibility for command line usage with regular expressions:

```yaml
INSERT EXAMPLE HERE
```

The way this works is that `workflow` dynamically adds sub-tasks for
any input files that match the specified regular expression. XXXX THIS
IS REALLY COMPLICATED TO EXPLAIN. WILL PEOPLE UNDERSTAND THIS? DOUBT IT.


### command line interface
Expand Down
11 changes: 11 additions & 0 deletions examples/DEBIAN
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@ unzip
# Quick 'n dirty way of installing numpy and matplotlib dependencies
python-numpy
python-matplotlib

# Pillow dependencies
libtiff4-dev
libjpeg8-dev
zlib1g-dev
libfreetype6-dev
liblcms2-dev
libwebp-dev
tcl8.5-dev
tk8.5-dev
python-tk
1 change: 1 addition & 0 deletions examples/REQUIREMENTS
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
textblob
Pillow

# although these are installed in the system dependencies, also need
# to do this here so that they will be installed in the virtual
Expand Down
14 changes: 14 additions & 0 deletions examples/image-colors/src/calcualte_pixel_rgb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""read the pixel-by-pixel RGB of an image and output the rgb values
as a function of space
"""

import sys

from PIL import Image
image = Image.open(sys.argv[1])
pix = image.load()
width, height = image.size
for x in range(width):
for y in range(height):
r, g, b = pix[x, y]
print x, y, r, g, b
20 changes: 20 additions & 0 deletions examples/image-colors/src/calculate_pixel_rgb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""read the pixel-by-pixel RGB of an image and output the rgb values
as a function of space
"""

import sys

from PIL import Image
image = Image.open(sys.argv[1])
pix = image.load()
width, height = image.size
for x in range(width):
for y in range(height):

# black/white images just store a single value. cast as rgb
# here for convenience downstream
val = pix[x, y]
if isinstance(val, int):
print x, y, val, val, val
else:
print x, y, ' '.join(map(str, val))
17 changes: 17 additions & 0 deletions examples/image-colors/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,22 @@ depends:
- data/abstract.zip
- data/artphoto.zip
command:
- rm -rf {{creates}}
- mkdir -p {{creates}}
- for archive in {{depends|join(' ')}}; do unzip -d {{creates}} $archive; done

# this is an example of how to use regular expressions to make it easy
# to run an analysis on one file at a time rather than doing every
# image file. the data/img dependency sets up the dependency on the
# previous task. the regular expression makes it possible to run this
# one file at a time.
---
alias: data/rgb
creates: "data/rgb/{{subdirectory}}/{{rootname}}.dat"
depends:
- src/calculate_pixel_rgb.py
- data/img/(?P<subdirectory>\w+)/(?P<rootname>\w+).jpg
- data/img
command:
- mkdir -p $(dirname {{creates}})
- python {{depends|join(' ')}} > {{creates}}
1 change: 1 addition & 0 deletions workflow/commands/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ..tasks.graph import TaskGraph
from ..exceptions import ConfigurationNotFound


class Command(BaseCommand):
help_text = "Create and restore backup archives of workflows."

Expand Down
35 changes: 31 additions & 4 deletions workflow/commands/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import re

from ..exceptions import ShellError, CommandLineException
from ..notify import notify
Expand All @@ -9,13 +10,16 @@
class Command(BaseCommand, TaskIdMixin):
help_text = "Run the task workflow."

def inner_execute(self, task_id, force, dry_run):

def inner_execute(self, task_id, force, dry_run, **regex_limitations):
# restrict task graph as necessary for the purposes of running
# the workflow
if task_id is not None:
self.task_graph = self.task_graph.subgraph_needed_for([task_id])

# update the regex limitations on the graph if they are
# specified on the command line
self.task_graph.regex_limitations.update(regex_limitations)

# when the workflow is --force'd, this runs all
# tasks. Otherwise, only runs tasks that are out of sync.
if force:
Expand All @@ -28,10 +32,10 @@ def inner_execute(self, task_id, force, dry_run):
self.task_graph.successful = True

def execute(self, task_id=None, force=False, dry_run=False,
notify_emails=None):
notify_emails=None, **kwargs):
super(Command, self).execute()
try:
self.inner_execute(task_id, force, dry_run)
self.inner_execute(task_id, force, dry_run, **kwargs)
except CommandLineException, e:
raise
finally:
Expand Down Expand Up @@ -63,4 +67,27 @@ def add_command_line_options(self):
nargs=1,
help='Specify an email address to notify on completion.',
)
self.add_regex_options()
self.add_task_id_option('Specify a particular task to run.')

def add_regex_options(self):
group = self.option_parser.add_argument_group(
'Regular expression completion options'
)
for task_kwargs in self.task_kwargs_list:
depends = task_kwargs.get('depends')
if isinstance(depends, (str, unicode)):
self.add_regex_options_helper(group, depends)
elif isinstance(depends, (list, tuple)):
for d in depends:
self.add_regex_options_helper(group, d)

def add_regex_options_helper(self, group, resource):
resource_regex = re.compile(resource)
for regex_option in resource_regex.groupindex:
group.add_argument(
'--' + regex_option.replace('_', '-'),
type=str,
nargs="?",
help='Specify particular regex match for <%s>' % regex_option,
)
69 changes: 62 additions & 7 deletions workflow/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,76 @@
import os
import re

from . import base
from .file_system import FileSystem
from ..exceptions import NonUniqueTask


def get_or_create(graph, candidate_list):
def get_or_create(task, candidate_list, creates_or_depends):
"""This is a factory function that instantiates resources from a
candidate_list. Each candidate in the candidate_list must be a
string.
candidate_list.
"""
resources = []
for candidate in candidate_list:
if candidate is not None:

# check if resource has already been created for this graph
# and, if not, create it
# instantiate a new resource if it doesn't already exist
try:
resource = graph.resource_dict[candidate]
resource = task.graph.resource_dict[candidate]
except KeyError:
resource = FileSystem(graph, candidate)
resource = FileSystem(task.graph, candidate)

# bind the task to the appropriate data structure
# depending on whether this task creates this resource or
# depends on this resource.
if creates_or_depends == 'creates':
resource.add_creates_task(task)
else:
resource.add_depends_task(task)

resources.append(resource)
return resources


def add_to_task(task):
"""This factory function adds the appropriate `creates` or `depends`
resources for the specified task.
"""
# instantiate the resources associated with this task here
# to make sure we can resolve aliases if they exist.
get_or_create(task, task.depends_list, 'depends')
if not task.is_pseudotask():
get_or_create(task, task.creates_list, 'creates')


def find_regex_matches(graph, regex_str):
"""This function returns a list of all filenames matching regex_str
in the form of a regular expression match.
"""
# the easiest way to check whether regex_restrictions is to
# check, after the fact, whether the groupdict matches. There may
# be more clever ways of recompiling the regex, but none have been
# posted yet
# http://stackoverflow.com/q/23154978/564709
limitations = graph.regex_limitations

regex_match_list = []
regex = re.compile(regex_str)
# NOTE: this os.walk is probably the stupidest possible way to do
# this. I'm sure there's a more efficient way to address this
# issue, but I'm not going to worry about that yet.
#
# NOTE: This only works for files right now. no support for
# directories (for better or worse) and its not terribly obvious
# whether/how this extends to other resource protocols (e.g.,
# mysql:database/table)
filesystem_crawler = os.walk(graph.root_directory, followlinks=True)
for directory, dirnames, filenames in filesystem_crawler:
for filename in filenames:
abs_filename = os.path.join(directory, filename)
rel_filename = os.path.relpath(abs_filename, graph.root_directory)
match = regex.match(rel_filename)
if match:
if (not limitations or match.groupdict() == limitations):
regex_match_list.append(match)
return regex_match_list
16 changes: 16 additions & 0 deletions workflow/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,25 @@ def __init__(self, graph, name):
)
self.graph.resource_dict[name] = self

# these are data structures that are used to track tasks that
# have dependencies through this resource
self.creates_task = None
self.depends_tasks = []

def __repr__(self):
return self.name + ':' + str(id(self))

def add_creates_task(self, task):
if self.creates_task is not None:
msg = "Resource '%s' is created by more than one task" % self.name
raise NonUniqueTask(msg)
self.creates_task = task
task.creates_resources.append(self)

def add_depends_task(self, task):
self.depends_tasks.append(task)
task.depends_resources.append(self)

@property
def root_directory(self):
"""Easy access to the graph's root_directory, which is stored once for
Expand Down
3 changes: 3 additions & 0 deletions workflow/resources/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class FileSystem(BaseResource):

def __init__(self, *args, **kwargs):
super(FileSystem, self).__init__(*args, **kwargs)
# REGEXP TODO: make sure this is *not* an absolute path. these
# are assumed to be relative paths by the
# resources.find_regexp_matches function
self.resource_path = os.path.realpath(
os.path.join(self.root_directory, self.name)
)
Expand Down
Loading