Logo Search packages:      
Sourcecode: beanstalkd version File versions  Download package

binlog.c

/* binlog.c - binary log implementation */

/* Copyright (C) 2008 Graham Barr

 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.

 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.

 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "config.h"

#if HAVE_STDINT_H
# include <stdint.h>
#endif /* else we get int types from config.h */

#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <sys/resource.h>
#include <sys/param.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <stdarg.h>
#include <limits.h>
#include <stddef.h>

#include "tube.h"
#include "job.h"
#include "binlog.h"
#include "util.h"
#include "port.h"

typedef struct binlog *binlog;

struct binlog {
  binlog next;
  unsigned int refs;
  int fd;
  size_t free;
  size_t reserved;
  char path[];
};

/* max size we will create a log file */
size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;

int enable_fsync = 0;
size_t fsync_throttle_ms = 0;
uint64_t last_fsync = 0;

char *binlog_dir = NULL;
static int binlog_index = 0;
static int binlog_version = 5;
static int lock_fd;

static binlog oldest_binlog = 0,
              current_binlog = 0,
              newest_binlog = 0;

static const size_t job_record_size = offsetof(struct job, pad);

static int
binlog_scan_dir()
{
    DIR *dirp;
    struct dirent *dp;
    long min = 0;
    long max = 0;
    long val;
    char *endptr;
    size_t name_len;

    dirp = opendir(binlog_dir);
    if (!dirp) return 0;

    while ((dp = readdir(dirp)) != NULL) {
        name_len = strlen(dp->d_name);
        if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
            val = strtol(dp->d_name + 7, &endptr, 10);
            if (endptr && *endptr == 0) {
                if (max == 0 || val > max) max = val;
                if (min == 0 || val < min) min = val;
            }
        }
    }

    closedir(dirp);
    binlog_index = (int) max;
    return (int) min;
}

static void
binlog_remove_oldest()
{
    binlog b = oldest_binlog;

    if (!b) return;

    oldest_binlog = b->next;

    unlink(b->path);
    free(b);
}

static binlog
binlog_iref(binlog b)
{
    if (b) b->refs++;
    return b;
}

static void
binlog_dref(binlog b)
{
    if (!b) return;
    if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);

    --b->refs;
    if (b->refs < 1) {
        while (oldest_binlog && oldest_binlog->refs == 0) {
            binlog_remove_oldest();
        }
    }
}

/*
static void
binlog_warn(int fd, const char* path, const char *msg)
{
    warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
          "  Continuing. You may be missing data.");
}
*/

#define binlog_warn(b, fmt, args...) \
    warnx("WARNING, " fmt " at %s:%u. %s: ", \
          ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
          "Continuing. You may be missing data.")

static void
binlog_read_log_file(binlog b, job binlog_jobs)
{
    struct job js;
    tube t;
    job j;
    char tubename[MAX_TUBE_NAME_LEN];
    size_t namelen;
    ssize_t r;
    int version;

    r = read(b->fd, &version, sizeof(version));
    if (r == -1) return twarn("read()");
    if (r < sizeof(version)) {
        return binlog_warn(b, "EOF while reading version record");
    }

    if (version != binlog_version) {
        return warnx("%s: binlog version mismatch %d %d", b->path, version,
                     binlog_version);
    }

    while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
        if (namelen >= MAX_TUBE_NAME_LEN) {
            return binlog_warn(b, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
        }

        if (namelen > 0) {
            r = read(b->fd, tubename, namelen);
            if (r == -1) return twarn("read()");
            if (r < namelen) {
                lseek(b->fd, SEEK_CUR, 0);
                return binlog_warn(b, "EOF while reading tube name");
            }
        }

        tubename[namelen] = '\0';
        r = read(b->fd, &js, job_record_size);
        if (r == -1) return twarn("read()");
        if (r < job_record_size) {
          return binlog_warn(b, "EOF while reading job record");
        }

        if (!js.id) break;

        j = job_find(js.id);
        switch (js.state) {
        case JOB_STATE_INVALID:
            if (j) {
                job_remove(j);
                binlog_dref(j->binlog);
                job_free(j);
                j = NULL;
            }
            break;
        case JOB_STATE_READY:
        case JOB_STATE_DELAYED:
            if (!j && namelen > 0) {
                t = tube_find_or_make(tubename);
                j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
                                     t, js.id);
                j->next = j->prev = j;
                j->created_at = js.created_at;
                job_insert(binlog_jobs, j);
            }
            if (js.body_size && namelen > 0) { /* namelen > 0 only on new jobs */
                if (js.body_size > j->body_size) {
                    warnx("job size increased from %zu to %zu", j->body_size,
                          js.body_size);
                    job_remove(j);
                    binlog_dref(j->binlog);
                    job_free(j);
                    return binlog_warn(b, "EOF while reading job body");
                }
                r = read(b->fd, j->body, js.body_size);
                if (r == -1) return twarn("read()");
                if (r < js.body_size) {
                    warnx("dropping incomplete job %llu", j->id);
                    job_remove(j);
                    binlog_dref(j->binlog);
                    job_free(j);
                    return binlog_warn(b, "EOF while reading job body");
                }
            }
            break;
        }
        if (j) {
            j->state = js.state;
            j->deadline_at = js.deadline_at;
            j->pri = js.pri;
            j->delay = js.delay;
            j->ttr = js.ttr;
            j->timeout_ct = js.timeout_ct;
            j->release_ct = js.release_ct;
            j->bury_ct = js.bury_ct;
            j->kick_ct = js.kick_ct;

            /* this is a complete record, so we can move the binlog ref */
            if (namelen && js.body_size) {
                binlog_dref(j->binlog);
                j->binlog = binlog_iref(b);
            }
        }
    }
}

static void
binlog_close(binlog b)
{
    int r;

    if (!b) return;
    if (b->fd < 0) return;
    if (b->free) {
        // Some compilers give a warning if the return value of ftruncate is
        // ignored. So we pretend to use it.
        r = ftruncate(b->fd, binlog_size_limit - b->free);
        if (r == -1) {
            // Nothing we can do. The user might see warnings next startup.
        }
    }
    close(b->fd);
    b->fd = -1;
    binlog_dref(b);
}

static binlog
make_binlog(char *path)
{
    binlog b;

    b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
    if (!b) return twarnx("OOM"), (binlog) 0;
    strcpy(b->path, path);
    b->refs = 0;
    b->next = NULL;
    b->fd = -1;
    b->free = 0;
    b->reserved = 0;
    return b;
}

static binlog
make_next_binlog()
{
    int r;
    char path[PATH_MAX];

    if (!binlog_dir) return NULL;

    r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
    if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;

    return make_binlog(path);
}

static binlog
add_binlog(binlog b)
{
    if (newest_binlog) newest_binlog->next = b;
    newest_binlog = b;
    if (!oldest_binlog) oldest_binlog = b;

    return b;
}

static int
beanstalkd_fallocate(int fd, off_t offset, off_t len)
{
    off_t i;
    ssize_t w;
    off_t p;
    #define ZERO_BUF_SIZE 512
    char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */

    /* we only support a 0 offset */
    if (offset != 0) return EINVAL;

    if (len <= 0) return EINVAL;

    for (i = 0; i < len; i += w) {
        w = write(fd, &buf, ZERO_BUF_SIZE);
        if (w == -1) return errno;
    }

    p = lseek(fd, 0, SEEK_SET);
    if (p == -1) return errno;

    return 0;
}

static void
binlog_open(binlog log, size_t *written)
{
    int fd;
    size_t bytes_written;

    if (written) *written = 0;

    if (!binlog_iref(log)) return;

    fd = open(log->path, O_WRONLY | O_CREAT, 0400);

    if (fd < 0) return twarn("Cannot open binlog %s", log->path);

#ifdef HAVE_POSIX_FALLOCATE
    {
        int r;
        r = posix_fallocate(fd, 0, binlog_size_limit);
        if (r == EINVAL) {
            r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
        }
        if (r) {
            close(fd);
            binlog_dref(log);
            errno = r;
            return twarn("Cannot allocate space for binlog %s", log->path);
        }
    }
#else
    /* Allocate space in a slow but portable way. */
    {
        int r;
        r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
        if (r) {
            close(fd);
            binlog_dref(log);
            errno = r;
            return twarn("Cannot allocate space for binlog %s", log->path);
        }
    }
#endif

    bytes_written = write(fd, &binlog_version, sizeof(int));
    if (written) *written = bytes_written;

    if (bytes_written < sizeof(int)) {
        twarn("Cannot write to binlog");
        close(fd);
        binlog_dref(log);
        return;
    }

    log->fd = fd;
}

/* returns 1 on success, 0 on error. */
static int
binlog_use_next()
{
    binlog next;

    if (!current_binlog) return 0;

    next = current_binlog->next;

    if (!next) return 0;

    /* assert(current_binlog->reserved == 0); */

    binlog_close(current_binlog);
    current_binlog = next;

    return 1;
}

void
binlog_shutdown()
{
    binlog_use_next();
    binlog_close(current_binlog);
}

/* Returns the number of jobs successfully written (either 0 or 1).

   If this fails, something is seriously wrong. It should never fail because of
   a full disk. (The binlog_reserve_space_* functions, on the other hand, can
   fail because of a full disk.)

   If we are not using the binlog at all (!current_binlog), then we pretend to
   have made a successful write and return 1. */
int
binlog_write_job(job j)
{
    ssize_t written;
    size_t tube_namelen, to_write = 0;
    struct iovec vec[4], *vptr;
    int vcnt = 3, r;
    uint64_t now;

    if (!current_binlog) return 1;
    tube_namelen = 0;

    vec[0].iov_base = (char *) &tube_namelen;
    to_write += vec[0].iov_len = sizeof(size_t);

    vec[1].iov_base = j->tube->name;
    vec[1].iov_len = 0;

    vec[2].iov_base = (char *) j;
    to_write += vec[2].iov_len = job_record_size;

    if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED ||
        j->state == JOB_STATE_BURIED) {
        if (!j->binlog) {
            tube_namelen = strlen(j->tube->name);
            to_write += vec[1].iov_len = tube_namelen;
            vcnt = 4;
            vec[3].iov_base = j->body;
            to_write += vec[3].iov_len = j->body_size;
        }
    } else if (j->state == JOB_STATE_INVALID) {
        if (j->binlog) binlog_dref(j->binlog);
        j->binlog = NULL;
    } else {
        return twarnx("unserializable job state: %d", j->state), 0;
    }

    if (to_write > current_binlog->reserved) {
        r = binlog_use_next();
        if (!r) return twarnx("failed to use next binlog"), 0;
    }

    if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);

    while (to_write > 0) {
        written = writev(current_binlog->fd, vec, vcnt);

        if (written < 0) {
            if (errno == EAGAIN) continue;
            if (errno == EINTR) continue;

            twarn("writev");
            binlog_close(current_binlog);
            current_binlog = 0;
            return 0;
        }

        to_write -= written;
        if (to_write > 0 && written > 0) {
            for (vptr = vec; written >= vptr->iov_len; vptr++) {
                written -= vptr->iov_len;
                vptr->iov_len = 0;
            }
            vptr->iov_base = (char *) vptr->iov_base + written;
            vptr->iov_len -= written;
        }
        current_binlog->reserved -= written;
        j->reserved_binlog_space -= written;
    }

    now = now_usec() / 1000; /* usec -> msec */
    if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
        r = fdatasync(current_binlog->fd);
        if (r == -1) return twarn("fdatasync"), 0;
        last_fsync = now;
    }

    return 1;
}

static binlog
make_future_binlog()
{
    binlog b;
    size_t header;

    /* open a new binlog with more space to reserve */
    b = make_next_binlog();
    if (!b) return twarnx("error making next binlog"), (binlog) 0;
    binlog_open(b, &header);

    /* open failed, so we can't reserve any space */
    if (b->fd < 0) {
        free(b);
        return 0;
    }

    b->free = binlog_size_limit - header;
    b->reserved = 0;
    return b;
}

static int
can_move_reserved(size_t n, binlog from, binlog to)
{
    return from->reserved >= n && to->free >= n;
}

static void
move_reserved(size_t n, binlog from, binlog to)
{
    from->reserved -= n;
    from->free += n;
    to->reserved += n;
    to->free -= n;
}

static size_t
ensure_free_space(size_t n)
{
    binlog fb;

    if (newest_binlog && newest_binlog->free >= n) return n;

    /* open a new binlog */
    fb = make_future_binlog();
    if (!fb) return twarnx("make_future_binlog"), 0;

    add_binlog(fb);
    return n;
}

/* Preserve some invariants immediately after any space reservation.
 * Invariant 1: current_binlog->reserved >= n.
 * Invariant 2: current_binlog->reserved is congruent to n (mod z), where z
 * is the size of a delete record in the binlog. */
static size_t
maintain_invariant(size_t n)
{
    size_t reserved_later, remainder, complement, z, r;

    /* In this function, reserved bytes are conserved (they are neither created
     * nor destroyed). We just move them around to preserve the invariant. We
     * might have to create new free space (i.e. allocate a new binlog file),
     * though. */

    /* Invariant 1. */
    /* This is a loop, but it's guaranteed to run at most once. The proof is
     * left as an exercise for the reader. */
    while (current_binlog->reserved < n) {
        size_t to_move = current_binlog->reserved;

        r = ensure_free_space(to_move);
        if (r != to_move) {
            twarnx("ensure_free_space");
            if (newest_binlog->reserved >= n) {
                newest_binlog->reserved -= n;
            } else {
                twarnx("failed to unreserve %zd bytes", n); /* can't happen */
            }
            return 0;
        }

        move_reserved(to_move, current_binlog, newest_binlog);
        binlog_use_next();
    }


    /* Invariant 2. */

    z = sizeof(size_t) + job_record_size;
    reserved_later = current_binlog->reserved - n;
    remainder = reserved_later % z;
    if (remainder == 0) return n;
    complement = z - remainder;
    if (can_move_reserved(complement, newest_binlog, current_binlog)) {
        move_reserved(complement, newest_binlog, current_binlog);
        return n;
    }

    r = ensure_free_space(remainder);
    if (r != remainder) {
        twarnx("ensure_free_space");
        if (newest_binlog->reserved >= n) {
            newest_binlog->reserved -= n;
        } else {
            twarnx("failed to unreserve %zd bytes", n); /* can't happen */
        }
        return 0;
    }
    move_reserved(remainder, current_binlog, newest_binlog);

    return n;
}

/* Returns the number of bytes successfully reserved: either 0 or n. */
static size_t
binlog_reserve_space(size_t n)
{
    size_t r;

    /* This return value must be nonzero but is otherwise ignored. */
    if (!current_binlog) return 1;

    if (current_binlog->free >= n) {
        current_binlog->free -= n;
        current_binlog->reserved += n;
        return maintain_invariant(n);
    }

    r = ensure_free_space(n);
    if (r != n) return twarnx("ensure_free_space"), 0;

    newest_binlog->free -= n;
    newest_binlog->reserved += n;
    return maintain_invariant(n);
}

/* Returns the number of bytes reserved. */
size_t
binlog_reserve_space_put(job j)
{
    size_t z = 0;

    /* reserve space for the initial job record */
    z += sizeof(size_t);
    z += strlen(j->tube->name);
    z += job_record_size;
    z += j->body_size;

    /* plus space for a delete to come later */
    z += sizeof(size_t);
    z += job_record_size;

    return binlog_reserve_space(z);
}

size_t
binlog_reserve_space_update(job j)
{
    size_t z = 0;

    z += sizeof(size_t);
    z += job_record_size;
    return binlog_reserve_space(z);
}

int
binlog_lock()
{
    int r;
    struct flock lock;
    char path[PATH_MAX];

    r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
    if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;

    lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
    if (lock_fd == -1) return twarn("open"), 0;

    lock.l_type = F_WRLCK;
    lock.l_whence = SEEK_SET;
    lock.l_start = 0;
    lock.l_len = 0;
    r = fcntl(lock_fd, F_SETLK, &lock);
    if (r) return twarn("fcntl"), 0;

    return 1;
}

void
binlog_init(job binlog_jobs)
{
    int binlog_index_min;
    struct stat sbuf;
    int fd, idx, r;
    size_t n;
    char path[PATH_MAX];
    binlog b;

    if (!binlog_dir) return;

    /* Recover any jobs in old binlogs */

    if (stat(binlog_dir, &sbuf) < 0) {
        if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
    } else if (!(sbuf.st_mode & S_IFDIR)) {
        twarnx("%s", binlog_dir);
        return;
    }

    binlog_index_min = binlog_scan_dir();

    if (binlog_index_min) {
        for (idx = binlog_index_min; idx <= binlog_index; idx++) {
            r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
            if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);

            fd = open(path, O_RDONLY);

            if (fd < 0) {
                twarn("%s", path);
            } else {
                b = binlog_iref(add_binlog(make_binlog(path)));
                b->fd = fd;
                binlog_read_log_file(b, binlog_jobs);
                close(fd);
                b->fd = -1;
                binlog_dref(b);
            }
        }

    }


    /* Set up for writing out new jobs */
    n = ensure_free_space(1);
    if (!n) return twarnx("error making first writable binlog");

    current_binlog = newest_binlog;
}

const char *
binlog_oldest_index()
{
    if (!oldest_binlog) return "0";

    return strrchr(oldest_binlog->path, '.') + 1;
}

const char *
binlog_current_index()
{
    if (!newest_binlog) return "0";

    return strrchr(newest_binlog->path, '.') + 1;
}

Generated by  Doxygen 1.6.0   Back to index