mydumper/binlog.c
2017-03-24 11:42:13 +01:00

285 lines
8.0 KiB
C

/*
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Authors: Domas Mituzas, Facebook ( domas at fb dot com )
Mark Leith, Oracle Corporation (mark dot leith at oracle dot com)
Andrew Hutchings, SkySQL (andrew at skysql dot com)
*/
#include <glib.h>
#include <glib/gstdio.h>
#include <my_global.h>
#include <mysql.h>
#include <my_sys.h>
#include <mysqld_error.h>
#include <sql_common.h>
#include <string.h>
#include <zlib.h>
#include "mydumper.h"
#include "binlog.h"
#define BINLOG_MAGIC "\xfe\x62\x69\x6e"
#define EVENT_HEADER_LENGTH 19
#define EVENT_ROTATE_FIXED_LENGTH 8
enum event_postions {
EVENT_TIMESTAMP_POSITION= 0,
EVENT_TYPE_POSITION= 4,
EVENT_SERVERID_POSITION= 5,
EVENT_LENGTH_POSITION= 9,
EVENT_NEXT_POSITION= 13,
EVENT_FLAGS_POSITION= 17,
EVENT_EXTRA_FLAGS_POSITION= 19 // currently unused in v4 binlogs, but a good marker for end of header
};
enum event_type {
ROTATE_EVENT= 4,
FORMAT_DESCRIPTION_EVENT= 15,
EVENT_TOO_SHORT= 254 // arbitrary high number, in 5.1 the max event type number is 27 so this should be fine for a while
};
extern int compress_output;
extern gboolean daemon_mode;
extern gboolean shutdown_triggered;
FILE *new_binlog_file(char *binlog_file, const char *binlog_dir);
void close_binlog_file(FILE *outfile);
char *rotate_file_name(const char *buf);
void get_binlogs(MYSQL *conn, struct configuration *conf) {
// TODO: find logs we already have, use start position based on position of last log.
MYSQL_RES *result;
MYSQL_ROW row;
char* last_filename = NULL;
guint64 last_position;
// Only snapshot dump the binlogs once in daemon mode
static gboolean got_binlogs= FALSE;
if (got_binlogs)
return;
else
got_binlogs= TRUE;
if (mysql_query(conn, "SHOW MASTER STATUS")) {
g_critical("Error: Could not execute query: %s", mysql_error(conn));
return;
}
result = mysql_store_result(conn);
if ((row = mysql_fetch_row(result))) {
last_filename= g_strdup(row[0]);
last_position= strtoll(row[1], NULL, 10);
} else {
g_critical("Error: Could not obtain binary log stop position");
if (last_filename != NULL)
g_free(last_filename);
return;
}
mysql_free_result(result);
if (mysql_query(conn, "SHOW BINARY LOGS")) {
g_critical("Error: Could not execute query: %s", mysql_error(conn));
if (last_filename != NULL)
g_free(last_filename);
return;
}
result = mysql_store_result(conn);
while ((row = mysql_fetch_row(result))) {
struct job *j = g_new0(struct job,1);
struct binlog_job *bj = g_new0(struct binlog_job,1);
j->job_data=(void*) bj;
bj->filename=g_strdup(row[0]);
bj->start_position=4;
bj->stop_position= (!strcasecmp(row[0], last_filename)) ? last_position : 0;
j->conf=conf;
j->type=JOB_BINLOG;
g_async_queue_push(conf->queue,j);
}
mysql_free_result(result);
if (last_filename != NULL)
g_free(last_filename);
}
void get_binlog_file(MYSQL *conn, char *binlog_file, const char *binlog_directory, guint64 start_position, guint64 stop_position, gboolean continuous) {
// set serverID = max serverID - threadID to try an eliminate conflicts,
// 0 is bad because mysqld will disconnect at the end of the last log
// dupes aren't too bad since it is up to the client to check for them
uchar buf[128];
// We need to read the raw network packets
NET* net;
net= &conn->net;
unsigned long len;
FILE* outfile;
guint32 event_type;
gboolean read_error= FALSE;
gboolean read_end= FALSE;
gboolean rotated= FALSE;
guint32 server_id= G_MAXUINT32 - mysql_thread_id(conn);
guint64 pos_counter= 0;
int4store(buf, (guint32)start_position);
// Binlog flags (2 byte int)
int2store(buf + 4, 0);
// ServerID
int4store(buf + 6, server_id);
memcpy(buf + 10, binlog_file, strlen(binlog_file));
#if MYSQL_VERSION_ID < 50100
if (simple_command(conn, COM_BINLOG_DUMP, (const char *)buf,
#else
if (simple_command(conn, COM_BINLOG_DUMP, buf,
#endif
strlen(binlog_file) + 10, 1)) {
g_critical("Error: binlog: Critical error whilst requesting binary log");
}
while(1) {
outfile= new_binlog_file(binlog_file, binlog_directory);
if (outfile == NULL) {
g_critical("Error: binlog: Could not create binlog file '%s', %d", binlog_file, errno);
return;
}
write_binlog(outfile, BINLOG_MAGIC, 4);
while(1) {
len = 0;
if (net->vio != 0) len=my_net_read(net);
if ((len == 0) || (len == ~(unsigned long) 0)) {
// Net timeout (set to 1 second)
if (mysql_errno(conn) == ER_NET_READ_INTERRUPTED) {
if (shutdown_triggered) {
close_binlog_file(outfile);
return;
} else {
continue;
}
// A real error
} else {
g_critical("Error: binlog: Network packet read error getting binlog file: %s", binlog_file);
close_binlog_file(outfile);
return;
}
}
if (len < 8 && net->read_pos[0]) {
// end of data
break;
}
pos_counter += len;
event_type= get_event((const char*)net->read_pos + 1, len -1);
switch (event_type) {
case EVENT_TOO_SHORT:
g_critical("Error: binlog: Event too short in binlog file: %s", binlog_file);
read_error= TRUE;
break;
case ROTATE_EVENT:
if (rotated) {
read_end= TRUE;
} else {
len= 1;
rotated= TRUE;
}
break;
default:
// if we get this far this is a normal event to record
break;
}
if (read_error) break;
write_binlog(outfile, (const char*)net->read_pos + 1, len - 1);
if (read_end) {
if (!continuous) {
break;
} else {
g_free(binlog_file);
binlog_file= rotate_file_name((const char*)net->read_pos + 1);
break;
}
}
// stop if we are at requested end of last log
if ((stop_position > 0) && (pos_counter >= stop_position)) break;
}
close_binlog_file(outfile);
if ((!continuous) || (!read_end)) break;
if (continuous && read_end) {
read_end= FALSE;
rotated= FALSE;
}
}
}
char *rotate_file_name(const char *buf) {
guint32 event_length= 0;
// event length is 4 bytes at position 9
event_length= uint4korr(&buf[EVENT_LENGTH_POSITION]);
// event length includes the header, plus a rotate event has a fixed 8byte part we don't need
event_length= event_length - EVENT_HEADER_LENGTH - EVENT_ROTATE_FIXED_LENGTH;
return g_strndup(&buf[EVENT_HEADER_LENGTH + EVENT_ROTATE_FIXED_LENGTH], event_length);
}
FILE *new_binlog_file(char *binlog_file, const char *binlog_dir) {
FILE *outfile;
char* filename;
if (!compress_output) {
filename= g_strdup_printf("%s/%s", binlog_dir, binlog_file);
outfile= g_fopen(filename, "w");
} else {
filename= g_strdup_printf("%s/%s.gz", binlog_dir, binlog_file);
outfile= (void*) gzopen(filename, "w");
}
g_free(filename);
return outfile;
}
void close_binlog_file(FILE *outfile) {
if (!compress_output)
fclose(outfile);
else
gzclose((gzFile) outfile);
}
unsigned int get_event(const char *buf, unsigned int len) {
if (len < EVENT_TYPE_POSITION)
return EVENT_TOO_SHORT;
return buf[EVENT_TYPE_POSITION];
// TODO: Would be good if we can check for valid event type, unfortunately this check can change from version to version
}
void write_binlog(FILE* file, const char* data, guint64 len) {
int err;
if (len > 0) {
int write_result;
if (!compress_output)
write_result= write(fileno(file), data, len);
else
write_result= gzwrite((gzFile)file, data, len);
if (write_result <= 0) {
if (!compress_output)
g_critical("Error: binlog: Error writing binary log: %s", strerror(errno));
else
g_critical("Error: binlog: Error writing compressed binary log: %s", gzerror((gzFile)file, &err));
}
}
}