From c0dc302737d610230be50db31ad7b495e7a46644 Mon Sep 17 00:00:00 2001 From: Rasmus Ringdahl <rasmus.ringdahl@liu.se> Date: Thu, 6 Feb 2025 09:49:04 +0100 Subject: [PATCH] refactor: streamline example 3 --- 3_job_array/README.md | 22 ++++----- 3_job_array/config.txt | 7 +-- 3_job_array/data/parameters_first_job.txt | 1 - 3_job_array/data/parameters_second_job.txt | 1 - 3_job_array/data/parameters_third_job.txt | 1 - 3_job_array/job_array.sh | 4 +- 3_job_array/job_array_task copy.py | 55 ++++++++++++++++++++++ 3_job_array/job_array_task.py | 38 ++++++++++----- data/data_1.txt | 1 + data/data_2.txt | 1 + data/data_3.txt | 1 + data/data_4.txt | 1 + 12 files changed, 99 insertions(+), 34 deletions(-) delete mode 100644 3_job_array/data/parameters_first_job.txt delete mode 100644 3_job_array/data/parameters_second_job.txt delete mode 100644 3_job_array/data/parameters_third_job.txt create mode 100644 3_job_array/job_array_task copy.py create mode 100644 data/data_1.txt create mode 100644 data/data_2.txt create mode 100644 data/data_3.txt create mode 100644 data/data_4.txt diff --git a/3_job_array/README.md b/3_job_array/README.md index 62c3a27..d1ae2b4 100644 --- a/3_job_array/README.md +++ b/3_job_array/README.md @@ -7,25 +7,23 @@ To run the example do the following steps: 2. Change directory to the example code 3. Run `sbatch job_array.sh` 4. Check queue status by running `squeue` -5. When the job is completed check the file _job_array_XXX_YY.log_ - -Try to extend the job array with one more run by adding a new file in the _data_ folder and update the _config.txt_ file. +5. When the job is completed check the file _job_array_<JOB_ID>_<ARRAY_TASK_ID>.log_ ## Detailed description of the example -The batch script is the main file for the job allocation and preparation. Inside the python script a few environmental variables are fetched and printed out. Furthermore there are a folder, _data_ that contains some figurative input data and a _config.txt_ file that maps the array to the input data. +The batch script is the main file for the job allocation and preparation. Inside the python script a few environmental variables are fetched and printed out. Furthermore there is a _config.txt_ file that maps the array to the input data. ### The batch script The batch script, _job_array.sh_, contains four sections. The first section contains input arguments to the Slurm scheduler. The second section loads Python into environment so it is accessible. In the third step a the _config.txt_ file is read and the filename of the file corresponding to the array index is stored. Lastly the job step is performed with the relevant filename as input argument. The input arguments are defined with a comment beginning with SBATCH followed by the argument key and value. For easier readablility the -- method is used. -- __job-name:__ The name of the job is set to _demo_job_array_ -- __time:__ The requeted time is set to 5 minutes, _00:05:00_ -- __ntasks:__ The number of tasks to be performed in this job is set to _1_. -- __cpus-per-task:__ The requested number of cores per task is set to _2_ -- __mem:__ The requested memory is set to _50 MB_ -- __output:__ The standard output should be sent to the file _job_array_%A_%a.log__, the %A will expand to the job number and %a will expand to the array index. -- __array:__ The array is set t _1-3_. This represents a list of array ids that should be created. Each id will be a separate job. The array can be of any numbering that suites the user. +- __job-name:__ The name of the job +- __time:__ The requeted time +- __ntasks:__ The number of tasks to be performed in this job +- __cpus-per-task:__ The requested number of cpus per task +- __mem-per-cpu:__ The requested memory adjusted per the number of cpu's +- __output:__ File name for standard output +- __array:__ The ids of the array. _Note: Multiple similar jobs will be run and output files need to be handled in a way so they are not overwritten._ @@ -36,8 +34,6 @@ The job step with the single task is allocated and performed with the __srun__ c #### The configuration file and data files The _config.txt_ is a text file containing a simple table, the first column contains a the array index and the second column contains the filepath to the data file to be loaded into the job. It is importat that the index in the file matches the _--array_ argument. -The data files in this example is a simple json object but could be a CSV-file or other file formats. - For simpler applications the data files could be ignored and the _config.txt_ contains all relevant data. #### The python script diff --git a/3_job_array/config.txt b/3_job_array/config.txt index b017082..3073861 100644 --- a/3_job_array/config.txt +++ b/3_job_array/config.txt @@ -1,5 +1,6 @@ task file -1 data/parameters_first_job.txt -2 data/parameters_second_job.txt -3 data/parameters_third_job.txt +1 ../data/data_1.txt +2 ../data/data_2.txt +3 ../data/data_3.txt +4 ../data/data_4.txt diff --git a/3_job_array/data/parameters_first_job.txt b/3_job_array/data/parameters_first_job.txt deleted file mode 100644 index 157fbdd..0000000 --- a/3_job_array/data/parameters_first_job.txt +++ /dev/null @@ -1 +0,0 @@ -{"name": "First file", "sleep": [2,12,7,4,4]} \ No newline at end of file diff --git a/3_job_array/data/parameters_second_job.txt b/3_job_array/data/parameters_second_job.txt deleted file mode 100644 index b87fdb1..0000000 --- a/3_job_array/data/parameters_second_job.txt +++ /dev/null @@ -1 +0,0 @@ -{"name": "Second file", "sleep": [3,10,4,11,2]} \ No newline at end of file diff --git a/3_job_array/data/parameters_third_job.txt b/3_job_array/data/parameters_third_job.txt deleted file mode 100644 index 35894ce..0000000 --- a/3_job_array/data/parameters_third_job.txt +++ /dev/null @@ -1 +0,0 @@ -{"name": "Third file", "sleep": [12,3,14,10,20]} \ No newline at end of file diff --git a/3_job_array/job_array.sh b/3_job_array/job_array.sh index f448025..11353b1 100644 --- a/3_job_array/job_array.sh +++ b/3_job_array/job_array.sh @@ -5,7 +5,7 @@ #SBATCH --cpus-per-task=2 #SBATCH --mem-per-cpu=50MB #SBATCH --output=job_array_%A_%a.log -#SBATCH --array=1-3 +#SBATCH --array=1-4 # Loading Python into the environment module load python/anaconda3-2024.02-3.11.7 @@ -17,4 +17,4 @@ config=config.txt file=$(awk -v task=$SLURM_ARRAY_TASK_ID '$1==task {print $2}' $config) # Start job stage -srun python job_array_task.py ${file} \ No newline at end of file +srun python job_array_task.py ${file} output_${SLURM_ARRAY_TASK_ID}.csv \ No newline at end of file diff --git a/3_job_array/job_array_task copy.py b/3_job_array/job_array_task copy.py new file mode 100644 index 0000000..e888618 --- /dev/null +++ b/3_job_array/job_array_task copy.py @@ -0,0 +1,55 @@ +from datetime import datetime +from multiprocessing import Pool + +import json +import logging +import os +import sys +import time + +logger = logging.getLogger(__name__) + +def sleep(input): + time.sleep(input[1]) + logger.info('Task %d done.',input[0]) + +def main(filename: str): + # Read environment variables. + NUMBER_OF_CORES = os.environ.get('SLURM_CPUS_PER_TASK','Unknown') + if NUMBER_OF_CORES in 'Unknown': + logger.error('Unkown number of cores, exiting.') + return + + NUMBER_OF_CORES = int(NUMBER_OF_CORES) + logger.info('Running program with %d cores.',NUMBER_OF_CORES) + + # Reading configuration file and create a list of tasks + # This represents the reading of parameters and calculations + logger.info('Reading configuration from %s.',filename) + with open(filename, 'r') as file: + data = json.load(file) + + tasks = [] + total_time = 0 + for i in range(len(data['sleep'])): + time = data['sleep'][i] + tasks.append((i, time)) + total_time = total_time + time + + # Creating a multiprocessing pool to perform the tasks + pool = Pool(processes=NUMBER_OF_CORES) + + # Running submitting the tasks to the worker pool + tic = datetime.now() + logger.info('Submitting tasks to pool.') + pool.map(sleep, tasks) + toc = datetime.now() + + logger.info('All tasks are done, took %d seconds, compared to %d seconds with single thread.', + (toc-tic).seconds, total_time) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + filename = sys.argv[1] + main(filename) diff --git a/3_job_array/job_array_task.py b/3_job_array/job_array_task.py index e888618..b808369 100644 --- a/3_job_array/job_array_task.py +++ b/3_job_array/job_array_task.py @@ -9,24 +9,29 @@ import time logger = logging.getLogger(__name__) -def sleep(input): +def sleep(input) -> int: time.sleep(input[1]) logger.info('Task %d done.',input[0]) -def main(filename: str): + return input[1] + +def main(input_file: str, output_file: str): # Read environment variables. - NUMBER_OF_CORES = os.environ.get('SLURM_CPUS_PER_TASK','Unknown') - if NUMBER_OF_CORES in 'Unknown': - logger.error('Unkown number of cores, exiting.') + JOB_NAME = os.environ.get('SLURM_JOB_NAME','Unknown') + JOB_ID = os.environ.get('SLURM_JOB_ID','Unknown') + NUMBER_OF_CPUS = os.environ.get('SLURM_CPUS_PER_TASK','Unknown') + if NUMBER_OF_CPUS in 'Unknown': + logger.error('Unkown number of cpu''s, exiting.') return - NUMBER_OF_CORES = int(NUMBER_OF_CORES) - logger.info('Running program with %d cores.',NUMBER_OF_CORES) + NUMBER_OF_CPUS = int(NUMBER_OF_CPUS) + logger.info('**** Output for job %s (%s) ****', JOB_NAME, JOB_ID) + logger.info('Running program with %d cpu''s.',NUMBER_OF_CPUS) # Reading configuration file and create a list of tasks # This represents the reading of parameters and calculations - logger.info('Reading configuration from %s.',filename) - with open(filename, 'r') as file: + logger.info('Reading configuration from %s.',input_file) + with open(input_file, 'r') as file: data = json.load(file) tasks = [] @@ -37,19 +42,26 @@ def main(filename: str): total_time = total_time + time # Creating a multiprocessing pool to perform the tasks - pool = Pool(processes=NUMBER_OF_CORES) + pool = Pool(processes=NUMBER_OF_CPUS) # Running submitting the tasks to the worker pool tic = datetime.now() logger.info('Submitting tasks to pool.') - pool.map(sleep, tasks) + results = pool.map(sleep, tasks) toc = datetime.now() logger.info('All tasks are done, took %d seconds, compared to %d seconds with single thread.', (toc-tic).seconds, total_time) + logger.info('Writing result to %s', output_file) + with open(output_file, 'w') as file: + file.write('time\n') + for result in results: + file.write('{}\n'.format(result)) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - filename = sys.argv[1] - main(filename) + input_file = sys.argv[1] + output_file = sys.argv[2] + main(input_file, output_file) + sys.exit(0) diff --git a/data/data_1.txt b/data/data_1.txt new file mode 100644 index 0000000..689136b --- /dev/null +++ b/data/data_1.txt @@ -0,0 +1 @@ +{"name": "First data file", "sleep": [8,24,14,8,5]} \ No newline at end of file diff --git a/data/data_2.txt b/data/data_2.txt new file mode 100644 index 0000000..f053d92 --- /dev/null +++ b/data/data_2.txt @@ -0,0 +1 @@ +{"name": "Second data file", "sleep": [2,20,8,19,10]} \ No newline at end of file diff --git a/data/data_3.txt b/data/data_3.txt new file mode 100644 index 0000000..db11e7d --- /dev/null +++ b/data/data_3.txt @@ -0,0 +1 @@ +{"name": "Third data file", "sleep": [15,6,18,15,5]} \ No newline at end of file diff --git a/data/data_4.txt b/data/data_4.txt new file mode 100644 index 0000000..0049376 --- /dev/null +++ b/data/data_4.txt @@ -0,0 +1 @@ +{"name": "Forth data file", "sleep": [6,2,6,1,6,2,2,6,2,1,6,1,1,1,2,2,1,1,2,2,1,2,1,1,1]} \ No newline at end of file -- GitLab