Logo Search packages:      
Sourcecode: jack version File versions  Download package

jack_main_loop.py

# -*- coding: iso-8859-15 -*-
### jack_main_loop: the main encoding loop for
### jack - extract audio from a CD and encode it using 3rd party software
### Copyright (C) 1999-2004  Arne Zellentin <zarne@users.sf.net>

### This program is free software; you can redistribute it and/or modify
### it under the terms of the GNU General Public License as published by
### the Free Software Foundation; either version 2 of the License, or
### (at your option) any later version.

### This program is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
### GNU General Public License for more details.

### You should have received a copy of the GNU General Public License
### along with this program; if not, write to the Free Software
### Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import string
import signal
import select
import time
import sys
import os

import jack_functions
import jack_ripstuff
import jack_encstuff
import jack_children
import jack_display
import jack_helpers
import jack_targets
import jack_workers
import jack_status
import jack_utils
import jack_misc
import jack_term

from jack_globals import *

def main_loop(mp3s_todo, wavs_todo, space, dae_queue, enc_queue, track1_offset):
    global_error = 0    # remember if something went wrong
    actual_load = -2    # this is always smaller than max_load
    waiting_load = 0    # are we waiting for the load to drop?
    waiting_space = 0   # are we waiting for disk space to be freed?
    space_waiting = 0   # how much space _running_ subprocesses will consume
    space_adjust = 0    # by how much space has been modified
    blocked = 0         # we _try_ do detect deadlocks
    cycles = 0          # it's sort of a timer
    last_update = 0     # screen updates are done once per second
    pause = 0           # no new encoders are started if pause==1
    flags = "[   ]"     # runtime controllable flags
    enc_running = 0     # what is going on?
    dae_running = 0     # what is going on?

    rotate="/-\\|"
    rotate_ball=" .o0O0o."
    rot_cycle = len(rotate)
    rot_ball_cycle = len(rotate_ball)
    rot_count = 0
    global_done = 0 
    first_encoder = 1
    ext = jack_targets.targets[jack_helpers.helpers[cf['_encoder']]['target']]['file_extension']
    global_blocks = jack_functions.tracksize(wavs_todo)[BLOCKS] + jack_functions.tracksize(mp3s_todo)[BLOCKS]


                           #####################
                             ### MAIN LOOP ###
                           #####################

    global_start = time.time()
    while mp3s_todo or enc_queue or dae_queue or enc_running or dae_running:

                            # feed in the WAVs which have been there from the start

        if mp3s_todo and jack_functions.tracksize(mp3s_todo[0])[ENC] < space:
            waiting_space = 0
            enc_queue.append(mp3s_todo[0])
            space = space - jack_functions.tracksize(mp3s_todo[0])[ENC]
            jack_status.enc_stat_upd(mp3s_todo[0][NUM], "waiting for encoder.")
            mp3s_todo = mp3s_todo[1:]

                                                    # start new DAE subprocess

        elif (len(enc_queue) + enc_running) < (cf['_read_ahead'] + cf['_encoders']) and dae_queue and dae_running < cf['_rippers'] and ((jack_functions.tracksize(dae_queue[0])[BOTH] < space) or (cf['_only_dae'] and jack_functions.tracksize(dae_queue[0])[WAV] < space) or (cf['_otf'] and jack_functions.tracksize(dae_queue[0])[ENC] < space)):
            waiting_space = 0
            this_is_ok = 1
            if pause:
                this_is_ok = 0
                jack_status.dae_stat_upd(dae_queue[0][NUM], "Paused. Press 'c' to continue.")
            elif cf['_rip_from_device']:
                all_tracks_on_cd = jack_functions.gettoc(cf['_toc_prog'])
                if not jack_utils.cmp_toc_cd(jack_ripstuff.all_tracks_orig, all_tracks_on_cd, what=(NUM, LEN)):
                    while dae_queue:
                        track = dae_queue[0]
                        dae_queue = dae_queue[1:]
                        jack_status.dae_stat_upd(track[NUM], "Wrong disc - aborting this track")
                    global_error = global_error + 1
                    this_is_ok = 0
            if this_is_ok:
                if cf['_only_dae']:
                    space_waiting = space_waiting + jack_functions.tracksize(dae_queue[0])[WAV]
                    space = space - jack_functions.tracksize(dae_queue[0])[WAV]
                elif cf['_otf']:
                    space_waiting = space_waiting + jack_functions.tracksize(dae_queue[0])[ENC]
                    space = space - jack_functions.tracksize(dae_queue[0])[ENC]
                else:
                    space_waiting = space_waiting + jack_functions.tracksize(dae_queue[0])[BOTH]
                    space = space - jack_functions.tracksize(dae_queue[0])[BOTH]
                dae_running = dae_running + 1
                track = dae_queue[0]
                dae_queue = dae_queue[1:]
                if cf['_otf']:
                    # note: image_reader can't do otf at the moment.
                    jack_status.dae_stat_upd(track[NUM], ":DAE: waiting for status report...")
                    if cf['_encoder'] in ("lame", "gogo", "flac", "mppenc"):
                        jack_status.enc_stat_upd(track[NUM], "[no otf status for %s]" % cf['_encoder'])
                    else:
                        jack_status.enc_stat_upd(track[NUM], "waiting for encoder.")
                    enc_running = enc_running + 1
                    if first_encoder:
                        first_encoder = 0
                        global_start = time.time()
                    data = jack_workers.start_new_otf(track, cf['_ripper'], cf['_encoder'])
                    jack_children.children.append(data['rip'])
                    jack_children.children.append(data['enc'])
                else:
                    if jack_status.enc_status[track[NUM]]:
                        jack_status.enc_cache[track[NUM]] = jack_status.enc_status[track[NUM]]
                        jack_status.enc_stat_upd(track[NUM], "[...]")
                    jack_status.dae_stat_upd(track[NUM], ":DAE: waiting for status report...")
                    if cf['_rip_from_device']:
                        jack_children.children.append(jack_workers.start_new_ripper(track, cf['_ripper']))
                    elif cf['_image_file']:
                        jack_children.children.append(jack_workers.ripread(track, track1_offset))
                    else:
                        jack_status.dae_stat_upd(track[NUM], ":?AE: don't know how to rip this!")

                                            # start new encoder subprocess

        if enc_queue and enc_running < cf['_encoders']:
            if jack_functions.tracksize(enc_queue[0])[ENC] <= space + space_waiting:
                waiting_space = 0
                actual_load = jack_misc.loadavg()
                if actual_load < cf['_max_load']:
                    waiting_load = 0
                    enc_running = enc_running + 1
                    track = enc_queue[0]
                    enc_queue = enc_queue[1:]
                    jack_status.enc_stat_upd(track[NUM], "waiting for encoder...")
                    jack_children.children.append(jack_workers.start_new_encoder(track, cf['_encoder']))
                    if first_encoder:
                        first_encoder = 0
                        global_start = time.time()
                else:
                    waiting_load = 1

                                            # check for subprocess output

        readfd=[sys.stdin.fileno()]
        for i in jack_children.children:
            readfd.append(i['fd'])
        try:
            rfd, wfd, xfd = select.select(readfd, [], [], cf['_update_interval'])
        except:
            rfd, wfd, xfd = [], [], []
            jack_term.tmod.sig_winch_handler(None, None)

                                            # check for keyboard commands

        if sys.stdin.fileno() in rfd:
            last_update = last_update - cf['_update_interval']
            cmd = jack_term.tmod.getkey()
            sys.stdin.flush()
            if string.upper(cmd) == "Q":
                jack_display.exit()
            elif not pause and string.upper(cmd) == "P":
                pause = 1
                flags = flags[:1] + "P" + flags[2:]
            elif string.upper(cmd) == "C" or pause and string.upper(cmd) == "P":
                pause = 0
                flags = flags[:1] + " " + flags[2:]
            elif not flags[3] == "e" and string.upper(cmd) == "E":
                for i in jack_children.children:
                    if i['type'] == "encoder":
                        os.kill(i['pid'], signal.SIGSTOP)
                        flags = flags[:3] + "e" + flags[4:]
            elif flags[3] == "e" and string.upper(cmd) == "E":
                for i in jack_children.children:
                    if i['type'] == "encoder":
                        os.kill(i['pid'], signal.SIGCONT)
                        flags = flags[:3] + " " + flags[4:]
            elif not flags[2] == "r" and string.upper(cmd) == "R":
                for i in jack_children.children:
                    if i['type'] == "ripper":
                        os.kill(i['pid'], signal.SIGSTOP)
                        flags = flags[:2] + "r" + flags[3:]
            elif flags[2] == "r" and string.upper(cmd) == "R":
                for i in jack_children.children:
                    if i['type'] == "ripper":
                        os.kill(i['pid'], signal.SIGCONT)
                        flags = flags[:2] + " " + flags[3:]
            elif string.upper(cmd) == "U":
                cycles = 29     # do periodic stuff _now_
            else:
                jack_term.tmod.move_pad(cmd)
                if cmd == 'KEY_RESIZE':
                    continue
                last_update = time.time()

        # read from file with activity
        for i in jack_children.children:
            if i['fd'] in rfd:
                if os.uname()[0] == "Linux" and i['type'] != "image_reader":
                    try:
                        x = i['file'].read()
                    except IOError:
                        pass
                else:
                    read_chars = 0
                    x = ""
                    while read_chars < jack_helpers.helpers[i['prog']]['status_blocksize']:
                        try:
                            xchar = i['file'].read(1)
                        except IOError:
                            break
                        x = x + xchar
                        read_chars = read_chars + 1
                        try:
                            rfd2, wfd2, xfd2 = select.select([i['fd']], [], [], 0.0)
                        except:
                            rfd2, wfd2, xfd2 = [], [], []
                            jack_term.tmod.sig_winch_handler(None, None)
                        if i['fd'] not in rfd2:
                            break
                # put read data into child's buffer
                i['buf'] = i['buf'] + x

                if jack_helpers.helpers[i['prog']].has_key('filters'):
                    for fil in jack_helpers.helpers[i['prog']]['filters']:
                        i['buf'] = fil[0].sub(fil[1], i['buf'])

                i['buf'] = i['buf'][-jack_helpers.helpers[i['prog']]['status_blocksize']:]

        # check for exiting child processes
        if jack_children.children:
            respid, res = os.waitpid(-1, os.WNOHANG)
            if respid != 0:
                last_update = last_update - cf['_update_interval'] # ensure info is printed
                new_ch = []
                exited_proc = []
                for i in jack_children.children:
                    if i['pid'] == respid:
                        if exited_proc != []:
                            error("pid " + `respid` + " found at multiple child processes")
                        exited_proc = i
                    else:
                        new_ch.append(i)
                if not exited_proc:
                    error("unknown process (" + `respid` + ") has exited")
                jack_children.children = new_ch
                x = ""
                try:
                    x = exited_proc['file'].read()
                except IOError:
                    pass
                except ValueError:
                    pass
                exited_proc['buf'] = (exited_proc['buf'] + x)[-jack_helpers.helpers[exited_proc['prog']]['status_blocksize']:]
                exited_proc['file'].close()

                global_error = global_error + res
                track = exited_proc['track']
                num = track[NUM]
                stop_time = time.time()
                speed = ( track[LEN] / float(CDDA_BLOCKS_PER_SECOND)) / ( stop_time - exited_proc['start_time'] )

                if exited_proc['type'] in ("ripper", "image_reader"):
                    dae_running = dae_running - 1
                    if cf['_exec_when_done'] and exited_proc['type'] == "ripper" and dae_running == 0 and len(dae_queue) == 0:
                        os.system(cf['_exec_rip_done'])
                    if not res:
                        if not exited_proc['otf']:
                            if os.path.exists(track[NAME] + ".wav"):
                                if jack_functions.tracksize(track)[WAV] != jack_utils.filesize(track[NAME] + ".wav"):
                                    res = 242
                                    jack_status.dae_stat_upd(num, jack_status.get_2_line(exited_proc['buf']))
                            else:
                                jack_status.dae_stat_upd(num, jack_status.get_2_line(exited_proc['buf']))
                                res = 243
                            global_error = global_error + res
                    if res and not cf['_sloppy']:
                        if os.path.exists(track[NAME] + ".wav"):
                            os.remove(track[NAME] + ".wav")
                            space = space + jack_functions.tracksize(track)[WAV]
                            if cf['_otf']:
                                os.kill(exited_proc['otf-pid'], signal.SIGTERM)
                                if os.path.exists(track[NAME] + ext):
                                    os.remove(track[NAME] + ext)
                                space = space + jack_functions.tracksize(track)[ENC]
                            if not cf['_otf'] and not cf['_only_dae'] and track not in jack_encstuff.mp3s_ready:
                                space = space + jack_functions.tracksize(track)[ENC]
                            jack_status.dae_stat_upd(num, 'DAE failed with status ' + `res` + ", wav removed.")
                    else:
                        if exited_proc['type'] == "image_reader":
                            jack_status.dae_stat_upd(num, jack_status.get_2_line(exited_proc['buf']))
                        else:
                            if exited_proc['otf'] and jack_helpers.helpers[exited_proc['prog']].has_key('otf-final_status_fkt'):
                                exec(jack_helpers.helpers[exited_proc['prog']]['otf-final_status_fkt']) in globals(), locals()
                            else:
                                last_status = None   # (only used in cdparanoia)
                                exec(jack_helpers.helpers[exited_proc['prog']]['final_status_fkt']) in globals(), locals()
                            jack_status.dae_stat_upd(num, final_status)
                        if jack_status.enc_cache[num]:
                            jack_status.enc_stat_upd(num, jack_status.enc_cache[num])
                            jack_status.enc_cache[num] = ""
                        jack_functions.progress(num, "dae", jack_status.dae_status[num])
                        if not cf['_otf'] and not cf['_only_dae'] and track not in jack_encstuff.mp3s_ready:
                            if waiting_space:
                                mp3s_todo.append(track)
                                space = space + jack_functions.tracksize(track)[ENC]
                            else:
                                jack_status.enc_stat_upd(num, 'waiting for encoder.')
                                enc_queue.append(track)
                    space_waiting = space_waiting - jack_functions.tracksize(track)[WAV]

                elif exited_proc['type'] == "encoder":
                    enc_running = enc_running - 1
                    # completed vbr files shouldn't be to small, but this still
                    # caused confusion so again, vbr is an exception:
                    if not cf['_vbr'] and not res and jack_functions.tracksize(track)[ENC] * 0.99 > jack_utils.filesize(track[NAME] + ext):
                        res = 242
                        global_error = global_error + res
                    if res:
                        global_blocks = global_blocks - exited_proc['track'][LEN]
                        global_start = global_start + exited_proc['elapsed'] / (enc_running + 1)
                        if global_start > time.time():
                            global_start = time.time()
                        if os.path.exists(track[NAME] + ext):
                            # mp3enc doesn't report errors when out of disk space...
                            os.remove(track[NAME] + ext)
                        space = space + jack_functions.tracksize(track)[ENC]
                        jack_status.enc_stat_upd(num, 'coding failed, err#' + `res`)
                    else:
                        global_done = global_done + exited_proc['track'][LEN]
                        if cf['_vbr']:
                            jack_status.enc_stat_upd(num, "[coding @" + '%s' % jack_functions.pprint_speed(speed) + "x done, %03.0fkbit]" % ((jack_utils.filesize(track[NAME] + ext) * 0.008) / (track[LEN] / 75.0)))
                        else:
                            jack_status.enc_stat_upd(num, "[coding @" + '%s' % jack_functions.pprint_speed(speed) + "x done, mp3 OK]")
                        if not cf['_otf'] and not cf['_keep_wavs']:
                            os.remove(track[NAME] + ".wav")
                            space = space + jack_functions.tracksize(track)[WAV]
                        jack_functions.progress(num, "enc", `track[RATE]`, jack_status.enc_status[num])

                else:
                    error("child process of unknown type (" + exited_proc['type'] + ") exited")
                if global_error:
                    jack_display.smile = " :-["

        if last_update + cf['_update_interval'] <= time.time():
            last_update = time.time()

                                                # interpret subprocess output

            for i in jack_children.children:
                if i['type'] == "ripper":
                    if len(i['buf']) == jack_helpers.helpers[i['prog']]['status_blocksize']:
                        if i['otf'] and jack_helpers.helpers[i['prog']].has_key('otf-status_fkt'):
                            exec(jack_helpers.helpers[i['prog']]['otf-status_fkt']) in globals(), locals()
                        else:
                            exec(jack_helpers.helpers[i['prog']]['status_fkt']) in globals(), locals()
                        if new_status:
                            jack_status.dae_stat_upd(i['track'][NUM], ":DAE: " + new_status)
        
                elif i['type'] == "encoder":
                    if len(i['buf']) == jack_helpers.helpers[i['prog']]['status_blocksize']:
                        tmp_d = {'i': i.copy(), 'percent': 0}
                        try:
                            exec(jack_helpers.helpers[i['prog']]['percent_fkt']) in globals(), tmp_d
                        except:
                            tmp_d['percent'] = 0
                            debug("error in percent_fkt of %s." % `i`)
                        i['percent'] = tmp_d['percent']
                        if i['percent'] > 0:
                            i['elapsed'] = time.time() - i['start_time']
                            speed = ((i['track'][LEN] / float(CDDA_BLOCKS_PER_SECOND)) * ( i['percent'] / 100 )) / i['elapsed']
                            eta = (100 - i['percent']) * i['elapsed'] / i['percent']
                            eta_ms = "%02i:%02i" % (eta / 60, eta % 60)
                            jack_status.enc_stat_upd(i['track'][NUM], '%2i%% done, ETA:%6s, %sx' % (i['percent'], eta_ms, jack_functions.pprint_speed(speed)))
                            #jack_term.tmod.dae_stat_upd(i['track'][NUM], None, i['percent'])
        
                elif i['type'] == "image_reader":
                    line = string.strip(jack_status.get_2_line(i['buf']))
                    jack_status.dae_stat_upd(i['track'][NUM], line)
                    if line.startswith("Error"):
                        global_error = global_error + 1
        
                else:
                    error("unknown subprocess type \"" + i['type'] + "\".")

            cycles = cycles + 1
            if cycles % 30 == 0:
                if cf['_recheck_space'] and not cf['space_from_argv']['history'][-1][0] == "argv":
                    actual_space = jack_functions.df()
                    if space_adjust:
                        diff = actual_space - space
                        if diff > space_adjust:
                            space = space + space_adjust
                            space_adjust = 0
                            waiting_space = 0
                        else:
                            space = space + diff
                            space_adjust = space_adjust - diff
                    else:
                        if actual_space < space:
                            space_adjust = space - actual_space
                            space = actual_space

            if space_adjust and enc_running == 0 and dae_running == 0:
                waiting_space = waiting_space + 1
            if not waiting_space >= 2 and not waiting_load and enc_running == 0 and dae_running == 0:
                blocked = blocked + 1
            else:
                blocked = 0

            total_done = global_done
            for i in jack_children.children: 
                total_done = total_done + (i['percent'] / 100) * i['track'][LEN]
            elapsed = time.time() - global_start
            if global_blocks > 0:
                percent = total_done / global_blocks
            else:
                percent = 0
            if percent > 0 and elapsed > 40:
                eta = ((1 - percent) * elapsed / percent)
                eta_hms = " ETA=%i:%02i:%02i" % (eta / 3600, (eta % 3600) / 60, eta % 60)
            else:
                eta_hms = ""

            if string.strip(flags[1:-1]):
                print_flags = " " + flags
            else:
                print_flags = ""
            if dae_running:
                rot = rotate_ball[rot_count % rot_ball_cycle]
            else:
                rot = rotate[rot_count % rot_cycle]
            rot_count = rot_count + 1

                                                            # print status

            if blocked > 2:
                jack_display.special_line = " ...I feel blocked - quit with 'q' if you get bored... "
                if blocked > 5:
                    space = jack_functions.df() - cf['_keep_free']
            elif waiting_load and waiting_space >= 2:
                jack_display.special_line = " ...waiting for load (%.2f)" % actual_load + ") < %.2f" % cf['_max_load'] + " and for " + jack_functions.pprint_i(space_adjust, "%i %sBytes") + " to be freed... "
            elif waiting_space >= 2:
                jack_display.special_line = " ...waiting for " + jack_functions.pprint_i(space_adjust, "%i %sBytes") + " to be freed.... "
            elif waiting_load:
                jack_display.special_line = " ...waiting for load (%.2f) to drop below %.2f..." % (actual_load, cf['_max_load'])
            else:
                jack_display.special_line = None

            jack_display.bottom_line =  "(" + rot + ") " \
                + "SPACE:" * (space_adjust != 0) \
                + "space:" * (space_adjust == 0) \
                + jack_functions.pprint_i(space, "%i%sB") \
                + (" waiting_WAVs:%02i" % len(enc_queue)) \
                + " DAE:" + `cf['_rippers'] - dae_running` + "+" + `dae_running` \
                + " ENC:" + `cf['_encoders'] - enc_running` + "+" + `enc_running` \
                + eta_hms \
                + " errors: " + `global_error` \
                + jack_display.smile + print_flags
            
            jack_term.tmod.update(jack_display.special_line, jack_display.bottom_line)

    return global_error

# end of main loop #########################################################

Generated by  Doxygen 1.6.0   Back to index