Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions biobakery_workflows/biobakery_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import os
import subprocess

VERSION = "0.14.0"
from . import config

WORKFLOW_FOLDER="workflows"
WORKFLOW_EXTENSION=".py"

Expand Down Expand Up @@ -63,7 +64,7 @@ def parse_arguments(args,workflows):
parser.add_argument(
"--version",
action="version",
version="%(prog)s v"+VERSION)
version="%(prog)s v"+config.VERSION)
parser.add_argument(
"workflow",
choices=workflows,
Expand Down
2 changes: 2 additions & 0 deletions biobakery_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import os
import sys

VERSION = "0.14.0"

def get_home_directory():
""" Try to get the users home directory """

Expand Down
20 changes: 20 additions & 0 deletions biobakery_workflows/scripts/gzip_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# To run: $ python gzip_files.gz --input $FOLDER --output $FOLDER_OUT --grid-jobs 100

from anadama2 import Workflow

workflow = Workflow()

in_files = workflow.get_input_files(extension=".fastq")
out_files =[filename+".gz" for filename in workflow.name_output_files(in_files)]

workflow.add_task_group_gridable(
"gzip -c [depends[0]] > [targets[0]]",
depends=in_files,
targets=out_files,
docker_image="biobakery/kneaddata:0.7.2_cloud_v4",
mem=5*1024,
time=3*60,
cores=1)

workflow.go()

122 changes: 74 additions & 48 deletions biobakery_workflows/tasks/shotgun.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def kneaddata(workflow, input_files, extension, output_folder, threads, paired=N
kneaddata_output_files = zip(kneaddata_output_fastq, kneaddata_output_logs)

# get the output folder
kneaddata_output_folder = os.path.dirname(kneaddata_output_files[0][0])

rename_final_output = ""
if paired:
Expand All @@ -114,7 +113,8 @@ def kneaddata(workflow, input_files, extension, output_folder, threads, paired=N
time_equation="3*6*60 if file_size('[depends[0]]') < 10 else 5*6*60"
mem_equation="3*12*1024 if file_size('[depends[0]]') < 10 else 6*12*1024"
# need to rename the final output file here to the sample name
rename_final_output = " && mv [args[3]] [targets[0]]"
rename_final_output = " && cp [targets[2]] [targets[0]]"
kneaddata_output_files = zip(kneaddata_output_fastq, kneaddata_output_logs, kneaddata_output_repeats_removed_fastq)

# set additional options to empty string if not provided
if additional_options is None:
Expand Down Expand Up @@ -145,16 +145,17 @@ def kneaddata(workflow, input_files, extension, output_folder, threads, paired=N

# create a task for each set of input and output files to run kneaddata
# rename file with repeats in name to only sample name
for sample, depends, targets, intermediate_file in zip(sample_names, input_files, kneaddata_output_files, kneaddata_output_repeats_removed_fastq):
for sample, depends, targets in zip(sample_names, input_files, kneaddata_output_files):
workflow.add_task_gridable(
"kneaddata --input [depends[0]] --output [args[0]] --threads [args[1]] --output-prefix [args[2]] "+second_input_option+optional_arguments+" "+additional_options+rename_final_output,
"kneaddata --input [depends[0]] --output $(dirname [targets[0]]) --threads [args[0]] --output-prefix [args[1]] "+second_input_option+optional_arguments+" "+additional_options+rename_final_output,
depends=utilities.add_to_list(depends,TrackedExecutable("kneaddata")),
targets=targets,
args=[kneaddata_output_folder, threads, sample, intermediate_file],
args=[threads, sample],
time=time_equation, # 6 hours or more depending on file size
mem=mem_equation, # 12 GB or more depending on file size
cores=threads, # time/mem based on 8 cores
name=utilities.name_task(sample,"kneaddata")) # name task based on sample name
name=utilities.name_task(sample,"kneaddata"),
docker_image="biobakery/kneaddata:0.7.3_cloud_r1") # name task based on sample name

return kneaddata_output_fastq, kneaddata_output_logs

Expand Down Expand Up @@ -192,18 +193,18 @@ def kneaddata_read_count_table(workflow, input_files, output_folder):
workflow.go()
"""

# get the folder of the log files
input_folder=os.path.dirname(input_files[0])

# get the name for the output file
kneaddata_read_count_file = files.ShotGun.path("kneaddata_read_counts",output_folder,create_folder=True)

# add the task (which is not gridable as this task should take under 5 minutes)
workflow.add_task("kneaddata_read_count_table --input [args[0]] --output [targets[0]]",
workflow.add_task_gridable("kneaddata_read_count_table --input $(dirname [depends[0]]) --output [targets[0]]",
depends=input_files,
targets=kneaddata_read_count_file,
args=[input_folder],
name="kneaddata_read_count_table")
time=10,
mem=5*1024,
cores=1,
name="kneaddata_read_count_table",
docker_image="biobakery/kneaddata:0.7.3_cloud_r1")

return kneaddata_read_count_file

Expand Down Expand Up @@ -330,7 +331,6 @@ def taxonomic_profile(workflow,input_files,output_folder,threads,input_extension
main_folder=os.path.join("metaphlan2","main")
metaphlan2_output_files_profile = utilities.name_files(sample_names, output_folder, subfolder=main_folder, tag=metaphlan2_profile_tag, extension="tsv", create_folder=True)
metaphlan2_output_files_sam = utilities.name_files(sample_names, output_folder, subfolder=main_folder, tag="bowtie2", extension="sam")
metaphlan2_output_folder = os.path.dirname(metaphlan2_output_files_profile[0])

# determine the input file type based on the extension
if input_extension in ["fasta","fasta.gz","fa","fa.gz"]:
Expand All @@ -342,39 +342,47 @@ def taxonomic_profile(workflow,input_files,output_folder,threads,input_extension
if not already_profiled:
for sample, depend_fastq, target_profile, target_sam in zip(sample_names, input_files, metaphlan2_output_files_profile, metaphlan2_output_files_sam):
workflow.add_task_gridable(
"metaphlan2.py [depends[0]] --input_type [args[2]] --output_file [targets[0]] --samout [targets[1]] --nproc [args[0]] --no_map --tmp_dir [args[1]]",
"metaphlan2.py [depends[0]] --input_type [args[1]] --output_file [targets[0]] --samout [targets[1]] --nproc [args[0]] --no_map --tmp_dir $(dirname [targets[1]])",
depends=[depend_fastq,TrackedExecutable("metaphlan2.py")],
targets=[target_profile,target_sam],
args=[threads,metaphlan2_output_folder,input_type],
args=[threads,input_type],
time="3*60 if file_size('[depends[0]]') < 25 else 4*3*60", # 3 hours or more depending on input file size
mem="12*1024 if file_size('[depends[0]]') < 25 else 4*12*1024", # 12 GB or more depending on input file size
cores=threads, # time/mem based on 8 cores
name=utilities.name_task(sample,"metaphlan2"))
name=utilities.name_task(sample,"metaphlan2"),
docker_image="biobakery/metaphlan2:2.7.7_cloud_r1")
else:
# set the names of the already profiled outputs
metaphlan2_output_files_profile = input_files
metaphlan2_output_folder = os.path.dirname(input_files[0])

# merge all of the metaphlan taxonomy tables
metaphlan2_merged_output = files.ShotGun.path("taxonomic_profile", output_folder)

# run the humann2 join script to merge all of the metaphlan2 profiles
workflow.add_task(
"humann2_join_tables --input [args[0]] --output [targets[0]] --file_name [args[1]]",
workflow.add_task_gridable(
"humann2_join_tables --input $(dirname [depends[0]]) --output [targets[0]] --file_name [args[0]]",
depends=metaphlan2_output_files_profile,
targets=metaphlan2_merged_output,
args=[metaphlan2_output_folder, metaphlan2_profile_tag],
name="metaphlan2_join_taxonomic_profiles")
args=[metaphlan2_profile_tag],
time=10,
mem=5*1024,
cores=1,
name="metaphlan2_join_taxonomic_profiles",
docker_image="biobakery/humann2:2.8.1_cloud_r1")

# get the name for the file to write the species counts
metaphlan2_species_counts_file = files.ShotGun.path("species_counts",output_folder,create_folder=True)

# create a file of species counts
workflow.add_task(
workflow.add_task_gridable(
"count_features.py --input [depends[0]] --output [targets[0]] --include s__ --filter t__ --reduce-sample-name",
depends=metaphlan2_merged_output,
targets=metaphlan2_species_counts_file,
name="metaphlan2_count_species")
time=10,
mem=5*1024,
cores=1,
name="metaphlan2_count_species",
docker_image="biobakery/workflows:0.13.5_cloud_r1")

return metaphlan2_merged_output, metaphlan2_output_files_profile, metaphlan2_output_files_sam

Expand Down Expand Up @@ -485,8 +493,6 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
# get the name for the file of read and species counts created from the humann2 log outputs
log_counts = files.ShotGun.path("humann2_read_counts",output_folder,create_folder=True)

humann2_output_folder = os.path.dirname(genefamiles[0])

# if taxonomic profiles are provided, add these to the targets and the command option
if taxonomic_profiles:
optional_profile_args=" --taxonomic-profile [depends[1]] "
Expand All @@ -505,22 +511,26 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
# create a task to run humann2 on each of the kneaddata output files
for sample, depend_fastq, target_gene, target_path, target_coverage, target_log in zip(sample_names, depends, genefamiles, pathabundance, pathcoverage, log_files):
workflow.add_task_gridable(
"humann2 --input [depends[0]] --output [args[0]] --o-log [targets[3]] --threads [args[1]]"+optional_profile_args,
"humann2 --input [depends[0]] --output $(dirname [targets[0]]) --o-log [targets[3]] --threads [args[0]]"+optional_profile_args,
depends=utilities.add_to_list(depend_fastq,TrackedExecutable("humann2")),
targets=[target_gene, target_path, target_coverage, target_log],
args=[humann2_output_folder, threads],
args=[threads],
time="24*60 if file_size('[depends[0]]') < 25 else 6*24*60", # 24 hours or more depending on file size
mem="32*1024 if file_size('[depends[0]]') < 25 else 3*32*1024", # 32 GB or more depending on file size
cores=threads,
name=utilities.name_task(sample,"humann2"))
name=utilities.name_task(sample,"humann2"),
docker_image="biobakery/humann2:2.8.1_cloud_r1")

# create a task to get the read and species counts for each humann2 run from the log files
workflow.add_task(
"get_counts_from_humann2_logs.py --input [args[0]] --output [targets[0]]",
workflow.add_task_gridable(
"get_counts_from_humann2_logs.py --input $(dirname [depends[0]]) --output [targets[0]]",
depends=log_files,
targets=log_counts,
args=humann2_output_folder,
name="humann2_count_alignments_species")
time=10, # 10 minutes
mem=5*1024, # 5 GB
cores=1,
name="humann2_count_alignments_species",
docker_image="biobakery/workflows:0.13.5_cloud_r1")

### STEP #2: Regroup UniRef90 gene families to ecs ###

Expand All @@ -537,7 +547,8 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
time=10, # 10 minutes
mem=5*1024, # 5 GB
cores=1,
name=map(lambda sample: utilities.name_task(sample,"humann2_regroup_UniRef2EC"), sample_names))
name=map(lambda sample: utilities.name_task(sample,"humann2_regroup_UniRef2EC"), sample_names),
docker_image="biobakery/humann2:2.8.1_cloud_r1")


### STEP #3: Merge gene families, ecs, and pathway abundance files
Expand All @@ -552,12 +563,16 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
all_targets=[merged_genefamilies, merged_ecs, merged_pathabundance]
file_basenames=["genefamilies","ecs","pathabundance"]
for depends, targets, basename in zip(all_depends, all_targets, file_basenames):
workflow.add_task(
"humann2_join_tables --input [args[0]] --output [targets[0]] --file_name [args[1]]",
workflow.add_task_gridable(
"humann2_join_tables --input $(dirname [depends[0]]) --output [targets[0]] --file_name [args[0]]",
depends=depends,
targets=targets,
args=[os.path.dirname(depends[0]),basename],
name="humann2_join_tables_"+basename)
args=[basename],
time=10, # 10 minutes
mem=50*1024 if "gene" in basename else 5*1024, # 50 GB
cores=1,
name="humann2_join_tables_"+basename,
docker_image="biobakery/humann2:2.8.1_cloud_r1")

### STEP #4: Normalize gene families, ecs, and pathway abundance to relative abundance (then merge files) ###

Expand All @@ -578,7 +593,8 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
time=15, # 15 minutes
mem=5*1024, # 5 GB
cores=1,
name=renorm_task_names)
name=renorm_task_names,
docker_image="biobakery/humann2:2.8.1_cloud_r1")


# get a list of merged files for ec, gene families, and pathway abundance
Expand All @@ -591,31 +607,41 @@ def functional_profile(workflow,input_files,extension,output_folder,threads,taxo
all_targets=[merged_genefamilies_relab, merged_ecs_relab, merged_pathabundance_relab]
all_types=["genes_relab","ecs_relab","pathways_relab"]
for depends, targets, input_type in zip(all_depends, all_targets, all_types):
workflow.add_task(
"humann2_join_tables --input [args[0]] --output [targets[0]]",
workflow.add_task_gridable(
"humann2_join_tables --input $(dirname [depends[0]]) --output [targets[0]]",
depends=depends,
targets=targets,
args=[os.path.dirname(depends[0])],
name="humann2_join_tables_"+input_type)
time=10,
mem=50*1024 if "gene" in input_type else 5*1024,
cores=1,
name="humann2_join_tables_"+input_type,
docker_image="biobakery/humann2:2.8.1_cloud_r1")

# get feature counts for the ec, gene families, and pathways
genefamilies_counts = files.ShotGun.path("genefamilies_relab_counts", output_folder)
ecs_counts = files.ShotGun.path("ecs_relab_counts", output_folder)
pathabundance_counts = files.ShotGun.path("pathabundance_relab_counts", output_folder)
workflow.add_task_group(
workflow.add_task_group_gridable(
"count_features.py --input [depends[0]] --output [targets[0]] --reduce-sample-name --ignore-un-features --ignore-stratification",
depends=[merged_genefamilies_relab, merged_ecs_relab, merged_pathabundance_relab],
targets=[genefamilies_counts, ecs_counts, pathabundance_counts],
name=["humann2_count_features_genes","humann2_count_features_ecs","humann2_count_features_pathways"])
time=10,
mem=5*1024,
cores=1,
name=["humann2_count_features_genes","humann2_count_features_ecs","humann2_count_features_pathways"],
docker_image="biobakery/workflows:0.13.5_cloud_r1")

# merge the feature counts into a single file
all_feature_counts = files.ShotGun.path("feature_counts", output_folder)
workflow.add_task(
"humann2_join_tables --input [args[0]] --output [targets[0]] --file_name _relab_counts.tsv",
workflow.add_task_gridable(
"humann2_join_tables --input $(dirname [depends[0]]) --output [targets[0]] --file_name _relab_counts.tsv",
depends=[genefamilies_counts, ecs_counts, pathabundance_counts],
targets=all_feature_counts,
args=[os.path.dirname(genefamilies_counts)],
name="humann2_merge_feature_counts")
time=10,
mem=5*1024,
cores=1,
name="humann2_merge_feature_counts",
docker_image="biobakery/humann2:2.8.1_cloud_r1")


return merged_genefamilies_relab, merged_ecs_relab, merged_pathabundance_relab, merged_genefamilies, merged_ecs, merged_pathabundance
Expand Down
21 changes: 9 additions & 12 deletions biobakery_workflows/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import time
import collections

from anadama2.tracked import TrackedDirectory
from anadama2.tracked import TrackedDirectory, s3_folder

# try to import urllib.request.urlretrieve for python3
try:
Expand Down Expand Up @@ -570,10 +570,11 @@ def sample_names(files,extension,pair_identifier=None):

return samples

def find_files(folder, extension=None, exit_if_not_found=None):
def find_files(workflow, folder, extension=None, exit_if_not_found=None):
""" Return the files in the given folder with the extension if provided

Args:
workflow (anadama2.Workflow): The workflow instance
folder (string): A path to a folder
extension (string): The file extension to search for (optional)
exit_if_not_found (bool): Indicator to check if files exist (optional)
Expand All @@ -585,16 +586,11 @@ def find_files(folder, extension=None, exit_if_not_found=None):
list: A list of files in the folder

Example:
files = find_files("examples","fastq")
files = find_files(workflow, "examples","fastq")
"""

# get all of the files in the folder
files=[os.path.join(folder,file) for file in os.listdir(folder)]
files=list(filter(lambda file: os.path.isfile(file),files))

# filter to only files with extension
if extension:
files=list(filter(lambda file: file.endswith(extension), files))
# get all of the files in the folder (also allows for inputs from s3)
files=workflow.get_input_files(extension=extension, input_folder=folder)

if exit_if_not_found:
if not files:
Expand Down Expand Up @@ -636,7 +632,8 @@ def name_files(names, folder, subfolder=None, tag=None, extension=None, create_f
names=[os.path.basename(name) for name in names]

# use the full path to the folder
folder=os.path.abspath(folder)
if not s3_folder(folder):
folder=os.path.abspath(folder)

# get the name of the full folder plus subfolder if provided
if subfolder:
Expand All @@ -652,7 +649,7 @@ def name_files(names, folder, subfolder=None, tag=None, extension=None, create_f

files=[os.path.join(folder,name) for name in names]

if create_folder:
if create_folder and not s3_folder:
create_folders(os.path.dirname(files[0]))

# if the input was originally a string, convert from list
Expand Down
6 changes: 3 additions & 3 deletions biobakery_workflows/workflows/16s.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


# create a workflow instance, providing the version number and description
workflow = Workflow(version="0.1", description="A workflow for 16S sequencing data")
workflow = Workflow(version=config.VERSION, description="A workflow for 16S sequencing data")

# add the custom arguments to the workflow
workflow_config = config.SixteenS()
Expand Down Expand Up @@ -63,10 +63,10 @@

# get all input files with the input extension provided on the command line
# return an error if no files are found
input_files = utilities.find_files(args.input, extension=args.input_extension, exit_if_not_found=True)
input_files = utilities.find_files(workflow, args.input, extension=args.input_extension, exit_if_not_found=True)

# check for index files, do not error if they are not found
index_files = utilities.find_files(args.input, extension=args.index_identifier+"."+args.input_extension)
index_files = utilities.find_files(workflow, args.input, extension=args.index_identifier+"."+args.input_extension)

# remove the index files, if found, from the set of input files
input_files = list(filter(lambda file: not file in index_files, input_files))
Expand Down
Loading