diff --git a/build.zig b/build.zig index 4c7c697a3..7360f2eb9 100644 --- a/build.zig +++ b/build.zig @@ -43,9 +43,6 @@ pub fn build(b: *Build) void { const zstd_dep = b.dependency("zstd", dep_opts); const zstd_mod = zstd_dep.module("zstd"); - const curl_dep = b.dependency("curl", dep_opts); - const curl_mod = curl_dep.module("curl"); - const rocksdb_dep = b.dependency("rocksdb", dep_opts); const rocksdb_mod = rocksdb_dep.module("rocksdb-bindings"); @@ -70,7 +67,6 @@ pub fn build(b: *Build) void { sig_mod.addImport("zig-cli", zig_cli_module); sig_mod.addImport("httpz", httpz_mod); sig_mod.addImport("zstd", zstd_mod); - sig_mod.addImport("curl", curl_mod); switch (blockstore_db) { .rocksdb => sig_mod.addImport("rocksdb", rocksdb_mod), .hashmap => {}, @@ -93,7 +89,6 @@ pub fn build(b: *Build) void { b.installArtifact(sig_exe); sig_exe.root_module.addImport("base58-zig", base58_module); - sig_exe.root_module.addImport("curl", curl_mod); sig_exe.root_module.addImport("httpz", httpz_mod); sig_exe.root_module.addImport("zig-cli", zig_cli_module); sig_exe.root_module.addImport("zig-network", zig_network_module); @@ -139,7 +134,6 @@ pub fn build(b: *Build) void { }); b.installArtifact(unit_tests_exe); unit_tests_exe.root_module.addImport("base58-zig", base58_module); - unit_tests_exe.root_module.addImport("curl", curl_mod); unit_tests_exe.root_module.addImport("httpz", httpz_mod); unit_tests_exe.root_module.addImport("zig-network", zig_network_module); unit_tests_exe.root_module.addImport("zstd", zstd_mod); @@ -187,7 +181,6 @@ pub fn build(b: *Build) void { benchmark_exe.root_module.addImport("zig-network", zig_network_module); benchmark_exe.root_module.addImport("httpz", httpz_mod); benchmark_exe.root_module.addImport("zstd", zstd_mod); - benchmark_exe.root_module.addImport("curl", curl_mod); benchmark_exe.root_module.addImport("prettytable", pretty_table_mod); switch (blockstore_db) { .rocksdb => benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod), diff --git a/build.zig.zon b/build.zig.zon index 9b94d7fde..262fc9d87 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -27,10 +27,6 @@ .url = "git+https://github.com/Syndica/zstd.zig#5095f011c1183aa67d696172795440d6a33732c9", .hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88", }, - .curl = .{ - .url = "https://github.com/jiacai2050/zig-curl/archive/8a3f45798a80a5de4c11c6fa44dab8785c421d27.tar.gz", - .hash = "1220f70ac854b59315a8512861e039648d677feb4f9677bd873d6b9b7074a5906485", - }, .rocksdb = .{ .url = "https://github.com/Syndica/rocksdb-zig/archive/6d4230e131183cccb730a7248bd4ca30c559b8bd.tar.gz", .hash = "12207766d25ba350d6e2f2153fc74a2b3ff204224e1c08adf211cd9e400075033898", diff --git a/src/accountsdb/download.zig b/src/accountsdb/download.zig index 48447dc8a..4d256f3cd 100644 --- a/src/accountsdb/download.zig +++ b/src/accountsdb/download.zig @@ -1,7 +1,6 @@ //! logic for downloading a snapshot const std = @import("std"); -const curl = @import("curl"); const sig = @import("../sig.zig"); const SlotAndHash = sig.accounts_db.snapshots.SlotAndHash; @@ -21,17 +20,21 @@ const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall; const DOWNLOAD_PROGRESS_UPDATES_NS = 6 * std.time.ns_per_s; -// The identifier for the scoped logger used in this file. -const LOG_SCOPE = "accountsdb.download"; +const BYTE_PER_KIB = 1024; +const BYTE_PER_MIB = 1024 * BYTE_PER_KIB; +const BYTE_PER_GIB = 1024 * BYTE_PER_MIB; + +/// The scope for the logger used in this file. +pub const LOG_SCOPE = "accountsdb.download"; /// Analogous to [PeerSnapshotHash](https://github.com/anza-xyz/agave/blob/f868aa38097094e4fb78a885b6fb27ce0e43f5c7/validator/src/bootstrap.rs#L342) -const PeerSnapshotHash = struct { +pub const PeerSnapshotHash = struct { contact_info: ThreadSafeContactInfo, full_snapshot: SlotAndHash, inc_snapshot: ?SlotAndHash, }; -const PeerSearchResult = struct { +pub const PeerSearchResult = struct { is_me_count: usize = 0, invalid_shred_version: usize = 0, no_rpc_count: usize = 0, @@ -169,12 +172,12 @@ pub fn findPeersToDownloadFromAssumeCapacity( pub fn downloadSnapshotsFromGossip( allocator: std.mem.Allocator, logger_: Logger, - // if null, then we trust any peer for snapshot download + /// if null, then we trust any peer for snapshot download maybe_trusted_validators: ?[]const Pubkey, gossip_service: *GossipService, output_dir: std.fs.Dir, min_mb_per_sec: usize, -) !void { +) !struct { std.fs.File, ?std.fs.File } { const logger = logger_.withScope(LOG_SCOPE); logger .info() @@ -192,7 +195,7 @@ pub fn downloadSnapshotsFromGossip( defer slow_peer_pubkeys.deinit(); while (true) { - std.time.sleep(std.time.ns_per_s * 5); // wait while gossip table updates + std.time.sleep(5 * std.time.ns_per_s); // wait while gossip table updates // only hold gossip table lock for this block { @@ -227,321 +230,196 @@ pub fn downloadSnapshotsFromGossip( .logf("searched for snapshot peers: {s}", .{write_buf[0..i]}); } + const bStr = sig.utils.fmt.boundedString; + const bFmt = sig.utils.fmt.boundedFmt; + const FullSnapshotFileInfo = sig.accounts_db.snapshots.FullSnapshotFileInfo; + const IncrementalSnapshotFileInfo = sig.accounts_db.snapshots.IncrementalSnapshotFileInfo; + + const download_buffer = try allocator.alloc(u8, 1 * BYTE_PER_MIB); + defer allocator.free(download_buffer); + for (available_snapshot_peers.items) |peer| { + const rpc_socket = peer.contact_info.rpc_addr.?; + const rpc_url = rpc_socket.toString(); + // download the full snapshot - const snapshot_filename_bounded = sig.accounts_db.snapshots.FullSnapshotFileInfo.snapshotArchiveName(.{ + const snapshot_filename = FullSnapshotFileInfo.snapshotArchiveName(.{ .slot = peer.full_snapshot.slot, .hash = peer.full_snapshot.hash, }); - const snapshot_filename = snapshot_filename_bounded.constSlice(); - - const rpc_socket = peer.contact_info.rpc_addr.?; - const rpc_url_bounded = rpc_socket.toString(); - const rpc_url = rpc_url_bounded.constSlice(); - - const bStr = sig.utils.fmt.boundedString; - const snapshot_url_bounded = sig.utils.fmt.boundedFmt("http://{s}/{s}\x00", .{ - bStr(&rpc_url_bounded), - bStr(&snapshot_filename_bounded), + const snapshot_url = bFmt("http://{s}/{s}", .{ + bStr(&rpc_url), bStr(&snapshot_filename), }); - const snapshot_url = snapshot_url_bounded.constSlice()[0 .. snapshot_url_bounded.len - 1 :0]; - - logger - .info() - .logf("downloading full_snapshot from: {s}", .{snapshot_url}); + const snapshot_uri = std.Uri.parse(snapshot_url.constSlice()) catch { + const url_str = snapshot_url.constSlice(); + std.debug.panic("Failed to Upri.parse '{s}'", .{url_str}); + }; - downloadFile( + logger.info().logf( + "downloading full_snapshot from: {s}", + .{snapshot_url.constSlice()}, + ); + const full_archive_file = downloadFile( allocator, logger, - snapshot_url, + snapshot_uri, output_dir, - snapshot_filename, + snapshot_filename.constSlice(), min_mb_per_sec, + download_buffer, ) catch |err| { switch (err) { - // if we hit this error, then the error should have been printed in the - // downloadFile function - error.Unexpected => {}, error.TooSlow => { logger.info().logf("peer is too slow, skipping", .{}); try slow_peer_pubkeys.append(peer.contact_info.pubkey); }, - else => { - logger.info().logf("failed to download full_snapshot: {s}", .{@errorName(err)}); - }, + else => logger.info().logf( + "failed to download full_snapshot: {s}", + .{@errorName(err)}, + ), } continue; }; + errdefer comptime unreachable; // download the incremental snapshot - // PERF: maybe do this in another thread? while downloading the full snapshot - if (peer.inc_snapshot) |inc_snapshot| { - const inc_snapshot_filename = try std.fmt.allocPrint(allocator, "incremental-snapshot-{d}-{d}-{s}.{s}", .{ - peer.full_snapshot.slot, - inc_snapshot.slot, - inc_snapshot.hash, - "tar.zst", + const inc_archive_file: ?std.fs.File = blk: { + // PERF: maybe do this in another thread? while downloading the full snapshot + const inc_snapshot = peer.inc_snapshot orelse break :blk null; + + const inc_snapshot_filename = IncrementalSnapshotFileInfo.snapshotArchiveName(.{ + .base_slot = peer.full_snapshot.slot, + .slot = inc_snapshot.slot, + .hash = inc_snapshot.hash, }); - defer allocator.free(inc_snapshot_filename); - - const inc_snapshot_url = try std.fmt.allocPrintZ(allocator, "http://{s}/{s}", .{ - rpc_url, - inc_snapshot_filename, + const inc_snapshot_url = bFmt("http://{s}/{s}", .{ + bStr(&rpc_url), bStr(&inc_snapshot_filename), }); - defer allocator.free(inc_snapshot_url); + const inc_snapshot_uri = std.Uri.parse(inc_snapshot_url.constSlice()) catch { + const url_str = inc_snapshot_url.constSlice(); + std.debug.panic("Failed to Upri.parse '{s}'", .{url_str}); + }; - logger.info().logf("downloading inc_snapshot from: {s}", .{inc_snapshot_url}); - _ = downloadFile( + logger.info().logf( + "downloading inc_snapshot from: {s}", + .{inc_snapshot_url.constSlice()}, + ); + break :blk downloadFile( allocator, logger, - inc_snapshot_url, + inc_snapshot_uri, output_dir, - inc_snapshot_filename, + inc_snapshot_filename.constSlice(), // NOTE: no min limit (we already downloaded the full snapshot at a good speed so this should be ok) null, + download_buffer, ) catch |err| { // failure here is ok (for now?) logger.warn().logf("failed to download inc_snapshot: {s}", .{@errorName(err)}); - return; + break :blk null; }; - } + }; - // success logger.info().logf("snapshot downloaded finished", .{}); - return; + return .{ full_archive_file, inc_archive_file }; } } } -const DownloadProgress = struct { - file: std.fs.File, - min_mb_per_second: ?usize, +/// downloads a file from a url into output_dir/filename +/// returns error if it fails. +/// the main errors include {HeaderRequestFailed, NoContentLength, TooSlow} or a curl-related error +fn downloadFile( + allocator: std.mem.Allocator, logger: ScopedLogger(LOG_SCOPE), + uri: std.Uri, + output_dir: std.fs.Dir, + filename: []const u8, + maybe_min_mib_per_second: ?usize, + /// Used as an intermediate buffer to read the response body before writing to disk. + /// Recommended size is at least 1 MiB for payloads which are expected to occupy 1 GiB or more. + download_buffer: []u8, +) !std.fs.File { + var http_client: std.http.Client = .{ .allocator = allocator }; + defer http_client.deinit(); + + var server_header_buffer: [4096]u8 = undefined; + var request = try http_client.open(.GET, uri, .{ + .server_header_buffer = &server_header_buffer, + }); + defer request.deinit(); - progress_timer: sig.time.Timer, - bytes_read: u64 = 0, - total_read: u64 = 0, - has_checked_speed: bool = false, - - const Self = @This(); - - fn init( - logger: ScopedLogger(LOG_SCOPE), - output_dir: std.fs.Dir, - filename: []const u8, - download_size: usize, - min_mb_per_second: ?usize, - ) !Self { - const file = try output_dir.createFile(filename, .{}); - // resize the file - try file.setEndPos(download_size); - - return .{ - .logger = logger, - .file = file, - .min_mb_per_second = min_mb_per_second, - .progress_timer = try sig.time.Timer.start(), - }; - } + try request.send(); + try request.finish(); + try request.wait(); - pub fn resetTimer(self: *Self) void { - self.progress_timer.reset(); - } + const download_size = request.response.content_length orelse + return error.NoContentLength; - fn deinit(self: *Self) void { - self.file.close(); + if (download_buffer.len < 1 * BYTE_PER_MIB and + download_size >= BYTE_PER_GIB) + { + logger.warn().logf("Downloading file of size {} using a buffer of size {};" ++ + " recommended buffer size for such a payload is at least 1 MiB.", .{ + std.fmt.fmtIntSizeBin(download_size), + std.fmt.fmtIntSizeBin(download_buffer.len), + }); } - fn writeCallback( - ptr: [*c]c_char, - size: c_uint, - nmemb: c_uint, - user_data: *anyopaque, - ) callconv(.C) c_uint { - std.debug.assert(size == 1); // size will always be 1 - const len = size * nmemb; - const self: *Self = @alignCast(@ptrCast(user_data)); - var typed_data: [*]u8 = @ptrCast(ptr); - const buf = typed_data[0..len]; - - self.file.writeAll(buf) catch |err| { - std.debug.print("failed to write to file: {s}", .{@errorName(err)}); - return 0; // trigger a callback error, "size" will always be > 0 - }; - self.bytes_read += len; - self.total_read += len; + const output_file = try output_dir.createFile(filename, .{}); + errdefer output_file.close(); + try output_file.setEndPos(download_size); + var buffered_out = std.io.bufferedWriter(output_file.writer()); - return len; - } + var total_bytes_read: u64 = 0; + var lap_timer = try sig.time.Timer.start(); + var full_timer = try sig.time.Timer.start(); + var checked_speed = false; - fn progressCallback( - user_data: *anyopaque, - download_total: c_ulong, - download_now: c_ulong, - upload_total: c_ulong, - upload_now: c_ulong, - ) callconv(.C) c_uint { - const self: *Self = @alignCast(@ptrCast(user_data)); - - // we're only downloading - std.debug.assert(upload_total == 0); - std.debug.assert(upload_now == 0); - const elapsed_ns = self.progress_timer.read().asNanos(); - if (elapsed_ns > DOWNLOAD_PROGRESS_UPDATES_NS) { - defer { - self.bytes_read = 0; - self.progress_timer.reset(); - } + while (true) { + const max_bytes_to_read = @min(download_buffer.len, download_size - total_bytes_read); + const bytes_read = try request.readAll(download_buffer[0..max_bytes_to_read]); + total_bytes_read += bytes_read; + + try buffered_out.writer().writeAll(download_buffer[0..bytes_read]); + if (total_bytes_read == download_size) break; + std.debug.assert(total_bytes_read < download_size); + + const elapsed_since_start = full_timer.read(); + const elapsed_since_prev_lap = lap_timer.read(); + if (elapsed_since_prev_lap.asNanos() <= DOWNLOAD_PROGRESS_UPDATES_NS) continue; + defer lap_timer.reset(); // reset at the end of the iteration, after the update, right before the next read & write. + + const total_bytes_left = download_size - total_bytes_read; + const time_left_ns = total_bytes_left * (elapsed_since_start.asNanos() / total_bytes_read); + logger.info().logf("[download progress]: {d}% done ({:.4}/s - {:.4}/{:.4}) (time left: {d})", .{ + total_bytes_read * 100 / download_size, + std.fmt.fmtIntSizeBin(total_bytes_read / elapsed_since_start.asSecs()), + std.fmt.fmtIntSizeBin(total_bytes_read), + std.fmt.fmtIntSizeBin(download_size), + std.fmt.fmtDuration(time_left_ns), + }); - const mb_read = self.bytes_read / 1024 / 1024; - if (mb_read == 0) { - self.logger.info().logf("download speed is too slow (<1MB/s) -- disconnecting", .{}); - return 1; // abort from callback - } + if (checked_speed) continue; + checked_speed = true; - const elapsed_sec = elapsed_ns / std.time.ns_per_s; - const ns_per_mb = elapsed_ns / mb_read; - const mb_left = (download_total - download_now) / 1024 / 1024; - const time_left_ns = mb_left * ns_per_mb; - const mb_per_second = mb_read / elapsed_sec; - - const should_check_speed = self.min_mb_per_second != null and !self.has_checked_speed; - if (should_check_speed) { - // dont check again - self.has_checked_speed = true; - if (mb_per_second < self.min_mb_per_second.?) { - // not fast enough => abort - self.logger.info().logf( - "[download progress]: speed is too slow ({}/s) -- disconnecting", - .{std.fmt.fmtIntSizeDec(download_now / elapsed_sec)}, - ); - return 1; // abort from callback - } else { - self.logger.info().logf("[download progress]: speed is ok ({d} MB/s) -- maintaining", .{mb_per_second}); - } - } + const min_bytes_per_second = BYTE_PER_MIB * (maybe_min_mib_per_second orelse continue); + const actual_bytes_per_second = total_bytes_read / elapsed_since_start.asSecs(); - self.logger.info().logf("[download progress]: {d}% done ({:.4}/s - {:.4}/{:.4}) (time left: {d})", .{ - self.total_read * 100 / download_total, - std.fmt.fmtIntSizeDec(self.bytes_read / elapsed_sec), - std.fmt.fmtIntSizeDec(download_now), - std.fmt.fmtIntSizeDec(download_total), - std.fmt.fmtDuration(time_left_ns), - }); + if (actual_bytes_per_second < min_bytes_per_second) { + // not fast enough => abort + logger.info().logf( + "[download progress]: speed is too slow ({:.4}/s) -- disconnecting", + .{std.fmt.fmtIntSizeBin(actual_bytes_per_second)}, + ); + return error.TooSlow; } - return 0; - } -}; - -fn checkCode(code: curl.libcurl.CURLcode) !void { - if (code == curl.libcurl.CURLE_OK) { - return; - } - // https://curl.se/libcurl/c/libcurl-errors.html - std.log.debug("curl err code:{d}, msg:{s}\n", .{ code, curl.libcurl.curl_easy_strerror(code) }); - return error.Unexpected; -} - -fn setNoBody(self: curl.Easy, no_body: bool) !void { - try checkCode(curl.libcurl.curl_easy_setopt( - self.handle, - curl.libcurl.CURLOPT_NOBODY, - @as(c_long, @intFromBool(no_body)), - )); -} - -fn setProgressFunction( - self: curl.Easy, - func: *const fn (*anyopaque, c_ulong, c_ulong, c_ulong, c_ulong) callconv(.C) c_uint, -) !void { - try checkCode(curl.libcurl.curl_easy_setopt( - self.handle, - curl.libcurl.CURLOPT_XFERINFOFUNCTION, - func, - )); -} - -fn setProgressData( - self: curl.Easy, - data: *const anyopaque, -) !void { - try checkCode(curl.libcurl.curl_easy_setopt( - self.handle, - curl.libcurl.CURLOPT_XFERINFODATA, - data, - )); -} - -fn enableProgress( - self: curl.Easy, -) !void { - try checkCode(curl.libcurl.curl_easy_setopt( - self.handle, - curl.libcurl.CURLOPT_NOPROGRESS, - @as(c_long, 0), - )); -} - -/// downloads a file from a url into output_dir/filename -/// returns error if it fails. -/// the main errors include {HeaderRequestFailed, NoContentLength, TooSlow} or a curl-related error -fn downloadFile( - allocator: std.mem.Allocator, - logger: ScopedLogger(LOG_SCOPE), - url: [:0]const u8, - output_dir: std.fs.Dir, - filename: []const u8, - min_mb_per_second: ?usize, -) !void { - var easy = try curl.Easy.init(allocator, .{}); - defer easy.deinit(); - - try easy.setUrl(url); - try easy.setMethod(.HEAD); - try setNoBody(easy, true); - var head_resp = easy.perform() catch { - return error.HeaderRequestFailed; - }; - - var download_size: usize = 0; - if (try head_resp.getHeader("content-length")) |content_length| { - download_size = try std.fmt.parseInt(usize, content_length.get(), 10); - } else { - logger.debug().logf("header request didnt have content-length...", .{}); - return error.NoContentLength; + logger.info().logf("[download progress]: speed is ok ({:.4}/s) -- maintaining", .{std.fmt.fmtIntSizeBin(actual_bytes_per_second)}); } - // timeout will need to be larger - easy.timeout_ms = std.time.ms_per_hour * 5; // 5 hours is probs too long but its ok - var download_progress = try DownloadProgress.init( - logger, - output_dir, - filename, - download_size, - min_mb_per_second, - ); - errdefer output_dir.deleteFile(filename) catch {}; - defer download_progress.deinit(); - - try setNoBody(easy, false); // full download - try easy.setUrl(url); - try easy.setMethod(.GET); - try easy.setWritedata(&download_progress); - try easy.setWritefunction(DownloadProgress.writeCallback); - try setProgressData(easy, &download_progress); - try setProgressFunction(easy, DownloadProgress.progressCallback); - try enableProgress(easy); - - download_progress.resetTimer(); - var resp = try easy.perform(); - defer resp.deinit(); - - const full_download = download_progress.total_read == download_size; - // this if block should only be hit if the download was too slow - if (!full_download) { - return error.TooSlow; - } + try buffered_out.flush(); + return output_file; } pub fn getOrDownloadAndUnpackSnapshot( @@ -609,7 +487,7 @@ pub fn getOrDownloadAndUnpackSnapshot( return error.SnapshotsNotFoundAndNoGossipService; }; - try downloadSnapshotsFromGossip( + const full, const maybe_inc = try downloadSnapshotsFromGossip( allocator, logger.unscoped(), options.trusted_validators, @@ -617,6 +495,8 @@ pub fn getOrDownloadAndUnpackSnapshot( snapshot_dir, @intCast(min_mb_per_sec), ); + defer full.close(); + defer if (maybe_inc) |inc| inc.close(); } const valid_accounts_folder = blk: { diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index e4a57da0d..15b40d487 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -1531,7 +1531,7 @@ fn downloadSnapshot() !void { var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_dir_str, .{}); defer snapshot_dir.close(); - try downloadSnapshotsFromGossip( + const full_file, const maybe_inc_file = try downloadSnapshotsFromGossip( gpa_allocator, app_base.logger.unscoped(), if (trusted_validators) |trusted| trusted.items else null, @@ -1539,6 +1539,8 @@ fn downloadSnapshot() !void { snapshot_dir, @intCast(min_mb_per_sec), ); + defer full_file.close(); + defer if (maybe_inc_file) |inc_file| inc_file.close(); } fn getTrustedValidators(allocator: Allocator) !?std.ArrayList(Pubkey) {