Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions src/cls/rgw/cls_rgw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,7 @@ class BIVerObjEntry {

public:
BIVerObjEntry(cls_method_context_t& _hctx, const cls_rgw_obj_key& _key) : hctx(_hctx), key(_key), initialized(false) {
// empty
}

int init(bool check_delete_marker = true) {
Expand Down Expand Up @@ -1543,11 +1544,20 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
return -EINVAL;
}

BIVerObjEntry obj(hctx, op.key);
BIOLHEntry olh(hctx, op.key);

/* read instance entry */
BIVerObjEntry obj(hctx, op.key);
int ret = obj.init(op.delete_marker);

/* NOTE: When a delete is issued, a key instance is always provided,
* either the one for which the delete is requested or a new random
* one when no instance is specified. So we need to see which of
* these two cases we're dealing with. The variable `existed` will
* be true if the instance was specified and false if it was
* randomly generated. It might have been cleaner if the instance
* were empty and randomly generated here and returned in the reply,
* as that would better allow a typo in the instance id. This code
* should be audited and possibly cleaned up. */

bool existed = (ret == 0);
if (ret == -ENOENT && op.delete_marker) {
ret = 0;
Expand All @@ -1556,6 +1566,28 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
return ret;
}

BIOLHEntry olh(hctx, op.key);
bool olh_read_attempt = false;
bool olh_found = false;
if (!existed && op.delete_marker) {
/* read olh */
ret = olh.init(&olh_found);
if (ret < 0) {
return ret;
}
olh_read_attempt = true;

// if we're deleting (i.e., adding a delete marker, and the OLH
// indicates it already refers to a delete marker, error out)
if (olh_found && olh.get_entry().delete_marker) {
CLS_LOG(10,
"%s: delete marker received for \"%s\" although OLH"
" already refers to a delete marker\n",
__func__, escape_str(op.key.to_string()).c_str());
return -ENOENT;
}
}

if (existed && !real_clock::is_zero(op.unmod_since)) {
timespec mtime = ceph::real_clock::to_timespec(obj.mtime());
timespec unmod = ceph::real_clock::to_timespec(op.unmod_since);
Expand Down Expand Up @@ -1608,11 +1640,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
}

/* read olh */
bool olh_found;
ret = olh.init(&olh_found);
if (ret < 0) {
return ret;
if (!olh_read_attempt) { // only read if we didn't attempt earlier
ret = olh.init(&olh_found);
if (ret < 0) {
return ret;
}
olh_read_attempt = true;
}

const uint64_t prev_epoch = olh.get_epoch();

if (!olh.start_modify(op.olh_epoch)) {
Expand Down
26 changes: 20 additions & 6 deletions src/cls/rgw/cls_rgw_types.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_CLS_RGW_TYPES_H
#define CEPH_CLS_RGW_TYPES_H
#pragma once

#include <boost/container/flat_map.hpp>
#include "common/ceph_time.h"
#include "common/Formatter.h"

#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
#include <fmt/format.h>

#include "rgw/rgw_basic_types.h"

#define CEPH_RGW_REMOVE 'r'
Expand Down Expand Up @@ -340,6 +343,14 @@ struct cls_rgw_obj_key {
cls_rgw_obj_key(const std::string &_name) : name(_name) {}
cls_rgw_obj_key(const std::string& n, const std::string& i) : name(n), instance(i) {}

std::string to_string() const {
return fmt::format("{}({})", name, instance);
}

bool empty() const {
return name.empty();
}

void set(const std::string& _name) {
name = _name;
}
Expand All @@ -348,19 +359,24 @@ struct cls_rgw_obj_key {
return (name.compare(k.name) == 0) &&
(instance.compare(k.instance) == 0);
}

bool operator<(const cls_rgw_obj_key& k) const {
int r = name.compare(k.name);
if (r == 0) {
r = instance.compare(k.instance);
}
return (r < 0);
}

bool operator<=(const cls_rgw_obj_key& k) const {
return !(k < *this);
}
bool empty() const {
return name.empty();

std::ostream& operator<<(std::ostream& out) const {
out << to_string();
return out;
}

void encode(ceph::buffer::list &bl) const {
ENCODE_START(1, 1, bl);
encode(name, bl);
Expand Down Expand Up @@ -1283,5 +1299,3 @@ struct cls_rgw_reshard_entry
void get_key(std::string *key) const;
};
WRITE_CLASS_ENCODER(cls_rgw_reshard_entry)

#endif
12 changes: 9 additions & 3 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7089,9 +7089,15 @@ static int decode_olh_info(CephContext* cct, const bufferlist& bl, RGWOLHInfo *o
}
}

int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver, rgw_zone_set* zones_trace)
int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
RGWObjectCtx& obj_ctx,
RGWObjState& state,
const RGWBucketInfo& bucket_info,
const rgw_obj& obj,
bufferlist& olh_tag,
std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver,
rgw_zone_set* zones_trace)
{
if (log.empty()) {
return 0;
Expand Down