1 Star2 Stars3 Stars4 Stars5 Stars (8 votes, average: 5.00 out of 5)
Loading ... Loading ...

Introduction

In my recent project there are quite many tasks that run in the background – generate thumbnails, detect colours on the pictures, run DB updates etc. Sure, all that is handled using cron and Yii commands. However there is a little problem. Consider we have DB update routine that should import new stock data from the datafeed. Datafeed is uploaded hourly, but upload can’t be scheduled to minutes – connection speed, different errors may interrupt the upload. On the other hand, sometimes processing takes 5 mins and sometimes – a few hours because in the first variant we just replace records and in the latter – download images for the new products.

The problem

So if we schedule to run every 5 mins (to check if update is ready), then we get the following:

  1. 19:00:00 started update
  2. 19:00:01 finished. No files arrived
  3. 19:05:00 started update
  4. 19:05:02 started DB update (1)
  5. 19:10:00 started update
  6. 19:10:02 started DB update (2)
  7. 19:11:48 finished DB update. Source file removed. (1)
  8. 19:11:50 error updating. File is not readable. Terminating (2)

As a result, DB will have wrong data because records were inserted twice. Sure, we can control this with DB constraints, but duplicates is not the only possible error.

Solution overview

The only variant to fix this – go Unix way and create lock file when some process starts. Actually that is a good variant, but I decided to use one file for all processes. Sure, I have to lock it when I read/write from it and other processes have to wait when I’m done with this. It is not brilliant, but I decided to go this way and I’ll show how I did this in this post.

First of all, I didn’t want to put the same code for locking/checking/unlocking in every command. So I created the taskmanager extension. It is not yet finished, so I don’t publish it.

Technical details

This is an application component which is extended from CApplicationComponent class and implements TaskManger interface:

  1. interface TaskManager
  2. {
  3.         public function isRunning($id);
  4.         public function logStarted($id);
  5.         public function logFinished($id);
  6.         public function logTerminated($id);
  7.         public function sendSignal($id);
  8.         public function getStatus($id);
  9.         public function getLog($id);
  10. }

Each command/process has it’s ID. ID is a plain text without spaces. Although spaces are not restricted and it will work with them, I decided to apply the same naming conventions as for class properties in Yii.
Let’s go through the methods:

  • isRunning($id). Takes process ID as a parameter and return true if this process is running.
  • logStarted($id). This checks if process is not already running (or if duplicates are allowed) and adds records to the pids file. Actually this interface may be implemented using the DB, so no locking or other things will be needed.
  • logFinished($id). Removes record from the pids file (or from DB if implemented using it).
  • logTerminated($id). Nearly the same as previous, but it doesn’t check if process is running, it tries to remove the record anyway.
  • sendSignal($id, $signal). Sends signal to the process. Process should check it’s buffer file when it is running and react to signals. It is not implemented because requires changes in the command code.
  • getStatus($id). Returns current status of a given process. It is not implemented either for the same reason.
  • getLog($id). Should return log of a process execution. Not implemented either.

And now let’s take a look at the class implementing it.
First of all, it’s initialization:

  1. class FileTaskManager extends CApplicationComponent implements TaskManager
  2. {
  3.     public $pidFile;
  4.     public $pidLogFile;
  5.     public $processDir;
  6.  
  7.         const TRIES_TIMEOUT = 1000;
  8.  
  9.         public function init()
  10.         {
  11.                 if (empty($this->pidFile)) $this->pidFile = Yii::app()->params[‘cmdPidFile’];
  12.                 if (empty($this->pidLogFile)) $this->pidLogFile = Yii::app()->params[‘cmdPidLogFile’];
  13.                 if (empty($this->processDir)) $this->processDir = Yii::app()->params[‘cmdProcDir’];
  14.                 $this->pidFile = Yii::app()->basePath.DIRECTORY_SEPARATOR.$this->pidFile;
  15.                 $this->pidLogFile = Yii::app()->basePath.DIRECTORY_SEPARATOR.$this->pidLogFile;
  16.                 $this->processDir = Yii::app()->basePath.DIRECTORY_SEPARATOR.$this->processDir;
  17.                 if (!file_exists($this->pidFile))
  18.                         touch($this->pidFile);
  19.                 if (!file_exists($this->pidLogFile))
  20.                         touch($this->pidLogFile);
  21.                 parent::init();
  22.         }
  23. ……
  24. }
  • $pidFile is where all currently running processes are stored
  • $pidLogFile is where we write “DD.MM.YYY HH:MM:SS process XXX started” and “ DD.MM.YYY HH:MM:SS process XXX finished”
  • $processDir is where log files and exchange files should reside (methods sendSignal, getStatus, getLog)

We get these variables from the config file and update to fit current application path. Also, since we extend the class from CApplicationComponent, we should perform standard initialization, an we do this by calling parent::init() in the last line.

Since we’re using file, it should be locked from other processes when we’re working with it. So we need a method that will open it and wait if it is busy now:

  1. protected function openLocked($fileName, $lockMode = LOCK_EX)
  2. {
  3.         if ($lockMode == LOCK_SH) $openMode = ‘r’;
  4.         else $openMode = ‘a’;
  5.         if ($fp = fopen($fileName, $openMode))
  6.         {
  7.                 $startTime = microtime(true);
  8.                 do
  9.                 {
  10.                         $canWrite = flock($fp, $lockMode);
  11.                         // If lock not obtained sleep for 0 – 100 milliseconds, to avoid collision and CPU load
  12.                         if(!$canWrite) usleep(round(rand(0, 100)*1000));
  13.                 }
  14.                 while ((!$canWrite)and((microtime()-$startTime) < self::TRIES_TIMEOUT));
  15.                 //file was locked so now we can store information
  16.                 if ($canWrite)
  17.                 {
  18.                         return $fp;
  19.                 }
  20.                 else
  21.                 {
  22.                         fclose($fp);
  23.                         throw new CException(‘File is locked’, 1);
  24.                 }
  25.         }
  26. }

Idea is pretty simple – we try to lock the file. If we succeed, we return the file pointer and we throw exception otherwise. We try it several times until we reach timeout. Intervals are random to ensure that different processes started simultaneously will not try to lock it at the same time.
And now we can implement our methods:

  1. public function isRunning($id)
  2. {
  3. $fl = $this->openLocked($this->pidFile, LOCK_SH);
  4.         $running = false;
  5.         while (!feof($fl))
  6.         {
  7.                 $process = fgets($fl);
  8.                 if (stristr($process, $id."\t"))
  9.                 {
  10.                         $running = true;
  11.                         break;
  12.                 }
  13.         }
  14.         fclose($fl);
  15.         return $running;
  16. }
  17.  
  18. public function logStarted($id, $allowConcurrent = false)
  19. {
  20.         if ($this->isRunning($id) && !$allowConcurrent)
  21.         throw new CException(‘Process is running’, 2);
  22.         $fl = $this->openLocked($this->pidFile, LOCK_EX);
  23.         $dtStarted = date(‘Y-m-d H:i:s’);
  24.         $pid = getmypid();
  25.         fwrite($fl, $dtStarted."\t".$id."\t".$pid."\n");
  26.         fclose($fl);
  27.         $fl = $this->openLocked($this->pidLogFile, LOCK_EX);
  28.         fwrite($fl, $dtStarted."\t".$id."\t"."started"."\n");
  29.         fclose($fl);
  30.         return $dtStarted;
  31. }
  32.  
  33. public function logFinished($id, $time = )
  34. {
  35.                 if (!$this->isRunning($id))
  36.                 throw new CException(‘Process is not running!’, 3);
  37.         $fl = $this->openLocked($this->pidFile, LOCK_SH);
  38.         $cont = ;
  39.         $pid = getmypid();
  40.         $match = $time ? $time."\t".$id."\t".$pid : $id."\t".$pid;
  41.         fseek($fl, 0);
  42.         while (!feof($fl))
  43.         {
  44.                 $process = fgets($fl);
  45.                 if (stristr($process, $match) === false)
  46.                 {
  47.                         if (!empty($process) && $process != "\n")
  48.                                 $cont .= $process;
  49.                 }
  50.         }
  51.         fclose($fl);
  52.         $fl = $this->openLocked($this->pidFile, LOCK_EX);
  53.         ftruncate($fl, 0);
  54.         fwrite($fl, $cont);
  55.         fclose($fl);
  56.         $fl = $this->openLocked($this->pidLogFile, LOCK_EX);
  57.         fwrite($fl, date(‘Y-m-d H:i:s’)."\t".$id."\t"."finished"."\n");
  58.         fclose($fl);
  59. }
  60.  
  61. public function logTerminated($id, $time = )
  62. {
  63.         $fl = $this->openLocked($this->pidFile, LOCK_SH);
  64.         $cont = ;
  65.         $pid = getmypid();
  66.         $match = $time ? $time."\t".$id."\t".$pid : $id."\t".$pid;
  67.         fseek($fl, 0);
  68.         while (!feof($fl))
  69.         {
  70.                 $process = fgets($fl);
  71.                 if (stristr($process, $match) === false)
  72.                 {
  73.                         if (!empty($process) && $process != "\n")
  74.                                 $cont .= $process;
  75.                 }
  76.         }
  77.         fclose($fl);
  78.         $fl = $this->openLocked($this->pidFile, LOCK_EX);
  79.         ftruncate($fl, 0);
  80.         fwrite($fl, $cont);
  81.         fclose($fl);
  82.         $fl = $this->openLocked($this->pidLogFile, LOCK_EX);
  83.         fwrite($fl, date(‘Y-m-d H:i:s’)."\t".$id."\t"."finished"."\n");
  84.         fclose($fl);
  85. }

So now we just need to call these methods from our command. But you know, I’m very lazy, so I created a base class for all my commands:

  1. abstract class GenericCommand extends CConsoleCommand
  2. {
  3.         protected $pid = ‘gen-cmd’;
  4.         protected $allowConcurrent = false;
  5.  
  6.         public function __construct()
  7.         {
  8.                 $this->pid = str_replace(‘Command’, , get_class($this));
  9.         }
  10.  
  11.         public function run($params)
  12.         {
  13.                 try
  14.                 {
  15.                         $time = Yii::app()->tm->logStarted($this->pid, $this->allowConcurrent);
  16.                         $this->runCmd($params);
  17.                         Yii::app()->tm->logFinished($this->pid, $time);
  18.                 }
  19.                 catch (Exception $e)
  20.                 {
  21.                         Yii::log(‘Process ‘.$this->pid.‘ error. ‘.$e->getMessage());
  22.                         echo ‘Process ‘.$this->pid.‘ error. ‘.$e->getMessage();
  23.                         if (!empty($time))
  24.                         {
  25.                                 try
  26.                                 {
  27.                                         Yii::app()->tm->logFinished($this->pid, $time);
  28.                                 }
  29.                                 catch (Exception $e2)
  30.                                 {
  31.                                         Yii::log(‘Process ‘.$this->pid.‘ can\’t be stopped. ‘.$e->getMessage());
  32.                                         echo ‘Process ‘.$this->pid.‘ can\’t be stopped. ‘.$e->getMessage();
  33.                                         Yii::app()->tm->logTerminated($this->pid, $time);
  34.                                 }
  35.                         }
  36.                 }
  37.         }
  38. }

It’s constructor created the process ID from the class name. It is quite convenient feature. It also handles start and stop so all we need to do is just to implement runCmd() method in the derived classes like this:

  1. class ChangeDBCommand extends GenericCommand
  2. {
  3.         public function runCmd($args)
  4.         {
  5.                 $shops = Shop::model()->findAll();
  6.                 foreach ($shops as $shop)
  7.                 {
  8.                         $shop->save();
  9.                 }
  10.                 $categories = Category::model()->findAll();
  11.                 foreach ($categories as $cat)
  12.                 {
  13.                         $cat->save();
  14.                 }
  15.         }
  16. }

Then you just type:

  1. $ php console.php changeDB &
  2. $ php console.php changeDB

Can’t start process! It is already running!

First command starts routine in background. Second command starts the same command but in the terminal. So you’ll see the output for it. It says that it can’t be started because previous run is not finished.

There is one thing you should do in order to make that work. You should add a component to the application configuration. Open the protected/config/console.php file and add into the components list:

  1. ‘tm’=>array(
  2.                         ‘class’ => ‘FileTaskManager’
  3.         )

And into the import section:

  1. ‘import’=>array(
  2.                 ‘application.models.*’,
  3.                 ‘application.components.*’,
  4.                 ‘application.helpers.*’,
  5.                 ‘application.extensions.taskmanager.*’, //this is added!
  6.         ),

Other problems

I’ve been using this solution for 5 months. Practice shows that sometimes process may run out of memory or some other error occurs that is not caught by out exception handling and it is terminated without our $tm->logFinished() call. This leads to the situation when process is not actually running, but task manager thinks it is because it’s record is still present in the pids file. In order to prevent such situations, I modified isRunning routine like this:

  1. public function isRunning($id)
  2. {
  3.         $fl = $this->openLocked($this->pidFile, LOCK_SH);
  4.         $running = false;
  5.         while (!feof($fl))
  6.         {
  7.                 $process = fgets($fl);
  8.                 if (stristr($process, $id."\t"))
  9.                 {
  10.                         if ($this->isReallyRunning($process))
  11.                         {
  12.                                 $running = true;
  13.                         }
  14.                         else
  15.                         {
  16.                                 $term = true;
  17.                         }
  18.                         break;
  19.                 }
  20.         }
  21.         fclose($fl);
  22.         if ($term) {
  23.                 $this->logTerminated($id);
  24.         }
  25.         return $running;
  26. }
  27.  
  28. protected function isReallyRunning($proc)
  29. {
  30.         list($_, $_, $pid) = explode("\t",$proc);
  31.         $pid = trim($pid);
  32.         $res = exec(‘ps -A|grep ‘.$pid);
  33.         if (empty($res))
  34.                 return false;
  35.         return true;
  36. }

This performs additional check by trying to find the selected PID in the system process list by listing all processes ps -A and checking if our pid is there (grep [pid]). If result is empty, then process is not running.

There is a potential pitfall in the isRunning method. If you try to do something with pids file before it is closed in line 21, you’ll get into the deadlock because file is locked for modifications. That’s why I call logTerminated after it is closed. Be attentive!

I’d be thankful if someone takes the code and adds signal processing there and maybe implements DBTaskManager in addition to FileTaskManager. Let me know if someone is interested, we can collaborate on this and deliver a nice ready-made extension.

As usual, any comments are welcome!

No related posts.

Related posts brought to you by Yet Another Related Posts Plugin.

Share this post with a friend Share this post with a friend

Leave a Reply